Browse Source

Рабочая проверка токенов.

Токены пишутся в базу, проверяются на срок действия.
master
Дмитрий 3 years ago
parent
commit
f19dac529b
  1. 119
      main.py

119
main.py

@ -1,26 +1,68 @@
import os
import logging
import threading
from aiogram import Bot, Dispatcher, executor, types
from time import sleep
from aiogram.utils.exceptions import MessageToDeleteNotFound
import asyncio
from datetime import timedelta
from datetime import timedelta, datetime
import secrets
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
logging.basicConfig(level=logging.INFO)
DELAY_TIME = 1
DELAY_TIME_Q = 3
CHAT_ID = "@gorgorod_information"
ALLOWED_USERS = [106693654, 7063133]
ADMINS = [106693654, 7063133]
API_TOKEN = os.getenv("TELEGRAM_API_TOKEN")
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
engine = create_engine("sqlite:///support.db")
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = Session()
class Allowed_user(Base):
__tablename__ = "allowed_users"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
date_add = Column(DateTime, nullable=False)
class Token(Base):
"""Класс для токенов.
Хранит сам ключ, и дату истечения.
"""
__tablename__ = "tokens"
id = Column(Integer, primary_key=True)
token = Column(String, nullable=False)
expire = Column(DateTime, nullable=False)
def expired(self) -> bool:
"""Проверка срока жизни токена.
Вовзращает истину, если срок действия токена истек.
"""
if datetime.now() > self.expire:
return True
return False
def __repr__(self):
return f"{self.id}: {self.token} | {self.expire}"
def auth(func):
async def wrapper(message):
if message["from"]["id"] not in ALLOWED_USERS:
return await message.reply("Access Denied", reply=False)
return # await message.reply("Access Denied", reply=False)
return await func(message)
return wrapper
@ -61,7 +103,16 @@ async def send_to_chanel(message: types.Message):
@dp.message_handler(commands=["question"])
@auth
async def send_to_chanel(message: types.Message):
async def send_question(message: types.Message):
"""Отправить вопрос через бота.
Через бота можно отправить вопрос, который будет удален через
DELAY_TIME_Q минут.
"""
if len(message.text) == len("/question"):
await message.reply("Пишешь пусто - делаешь грустно (мне)", reply=False)
return
deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q)
out_text = f"*Внимание, вопрос!*\n\n{message.text[10:]}\n\nБудет удалено в *{deletion_time}*"
@ -75,5 +126,63 @@ async def send_to_chanel(message: types.Message):
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q)
@dp.message_handler(commands=["get_invite"])
async def get_invite(message: types.Message):
if message["from"]["id"] not in ADMINS:
return
token = secrets.token_urlsafe(16)
exp = datetime.now() + timedelta(minutes=5)
test = Token(token=token, expire=exp)
session.add(test)
session.commit()
await message.reply(
f"Твой токен: *{token}* \nБудет действителен 5 минут",
reply=False,
parse_mode="Markdown",
)
@dp.message_handler(commands=["add_me"])
async def add_me(message: types.Message):
"""Добавление юзера в чат.
Когда бот получает токен, он проверяет его на валидность (есть ли
такой вообще, не истек ли) и высылает разовую ссылку на вступление.
"""
user_token = message.text[8:].strip()
tokens_list = session.query(Token).order_by(Token.id).all()
finded_token: Token = next((t for t in tokens_list if t.token == user_token), None)
if finded_token.expired():
print("Пизда токену")
if finded_token and not finded_token.expired():
ALLOWED_USERS.append(message["from"]["id"])
# генерируем ссылку, истечет через 5 минут, может вступить 1 человек
link = await bot.create_chat_invite_link(
chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1
)
await message.reply(
"Welcome to the internet"
"\nHave a look around"
"\nAnything that brain of yours can think of can be found"
f"\n\nТвоя ссылка для вступления: {link['invite_link']}",
reply=False,
parse_mode="Markdown",
)
if __name__ == "__main__":
# инициализация бд
Base.metadata.create_all(engine)
# print(session.query(Token).order_by(Token.id).all())
# проверить, все ли сообщения удалены
executor.start_polling(dp, skip_updates=True)

Loading…
Cancel
Save