diff --git a/main.py b/main.py index 08347d6..cfb1587 100644 --- a/main.py +++ b/main.py @@ -1,26 +1,68 @@ import os import logging -import threading from aiogram import Bot, Dispatcher, executor, types -from time import sleep from aiogram.utils.exceptions import MessageToDeleteNotFound import asyncio -from datetime import timedelta +from datetime import timedelta, datetime +import secrets +from sqlalchemy import create_engine +from sqlalchemy import Column, Integer, String, DateTime +from sqlalchemy.orm import declarative_base, sessionmaker + logging.basicConfig(level=logging.INFO) DELAY_TIME = 1 DELAY_TIME_Q = 3 CHAT_ID = "@gorgorod_information" ALLOWED_USERS = [106693654, 7063133] +ADMINS = [106693654, 7063133] API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) +engine = create_engine("sqlite:///support.db") +Base = declarative_base() +Session = sessionmaker(bind=engine) +session = Session() + + +class Allowed_user(Base): + __tablename__ = "allowed_users" + id = Column(Integer, primary_key=True) + user_id = Column(Integer, nullable=False) + date_add = Column(DateTime, nullable=False) + + +class Token(Base): + """Класс для токенов. + + Хранит сам ключ, и дату истечения. + + """ + + __tablename__ = "tokens" + id = Column(Integer, primary_key=True) + token = Column(String, nullable=False) + expire = Column(DateTime, nullable=False) + + def expired(self) -> bool: + """Проверка срока жизни токена. + + Вовзращает истину, если срок действия токена истек. + """ + if datetime.now() > self.expire: + return True + + return False + + def __repr__(self): + return f"{self.id}: {self.token} | {self.expire}" + def auth(func): async def wrapper(message): if message["from"]["id"] not in ALLOWED_USERS: - return await message.reply("Access Denied", reply=False) + return # await message.reply("Access Denied", reply=False) return await func(message) return wrapper @@ -61,7 +103,16 @@ async def send_to_chanel(message: types.Message): @dp.message_handler(commands=["question"]) @auth -async def send_to_chanel(message: types.Message): +async def send_question(message: types.Message): + """Отправить вопрос через бота. + + Через бота можно отправить вопрос, который будет удален через + DELAY_TIME_Q минут. + + """ + if len(message.text) == len("/question"): + await message.reply("Пишешь пусто - делаешь грустно (мне)", reply=False) + return deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q) out_text = f"*Внимание, вопрос!*\n\n{message.text[10:]}\n\nБудет удалено в *{deletion_time}*" @@ -75,5 +126,63 @@ async def send_to_chanel(message: types.Message): await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q) +@dp.message_handler(commands=["get_invite"]) +async def get_invite(message: types.Message): + + if message["from"]["id"] not in ADMINS: + return + + token = secrets.token_urlsafe(16) + + exp = datetime.now() + timedelta(minutes=5) + test = Token(token=token, expire=exp) + session.add(test) + session.commit() + + await message.reply( + f"Твой токен: *{token}* \nБудет действителен 5 минут", + reply=False, + parse_mode="Markdown", + ) + + +@dp.message_handler(commands=["add_me"]) +async def add_me(message: types.Message): + """Добавление юзера в чат. + + Когда бот получает токен, он проверяет его на валидность (есть ли + такой вообще, не истек ли) и высылает разовую ссылку на вступление. + + """ + user_token = message.text[8:].strip() + tokens_list = session.query(Token).order_by(Token.id).all() + + finded_token: Token = next((t for t in tokens_list if t.token == user_token), None) + + if finded_token.expired(): + print("Пизда токену") + + if finded_token and not finded_token.expired(): + + ALLOWED_USERS.append(message["from"]["id"]) + # генерируем ссылку, истечет через 5 минут, может вступить 1 человек + link = await bot.create_chat_invite_link( + chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1 + ) + + await message.reply( + "Welcome to the internet" + "\nHave a look around" + "\nAnything that brain of yours can think of can be found" + f"\n\nТвоя ссылка для вступления: {link['invite_link']}", + reply=False, + parse_mode="Markdown", + ) + + if __name__ == "__main__": + # инициализация бд + Base.metadata.create_all(engine) + # print(session.query(Token).order_by(Token.id).all()) + # проверить, все ли сообщения удалены executor.start_polling(dp, skip_updates=True)