import os import logging from aiogram import Bot, Dispatcher, executor, types from aiogram.utils.exceptions import MessageToDeleteNotFound import asyncio from datetime import timedelta, datetime import secrets from sqlalchemy import create_engine from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy.orm import declarative_base, sessionmaker logging.basicConfig(level=logging.INFO) DELAY_TIME = 1 DELAY_TIME_Q = 3 CHAT_ID = "@gorgorod_information" ALLOWED_USERS = [106693654, 7063133] ADMINS = [106693654, 7063133] API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) engine = create_engine("sqlite:///support.db") Base = declarative_base() Session = sessionmaker(bind=engine) session = Session() class Allowed_user(Base): __tablename__ = "allowed_users" id = Column(Integer, primary_key=True) user_id = Column(Integer, nullable=False) date_add = Column(DateTime, nullable=False) class Token(Base): """Класс для токенов. Хранит сам ключ, и дату истечения. """ __tablename__ = "tokens" id = Column(Integer, primary_key=True) token = Column(String, nullable=False) expire = Column(DateTime, nullable=False) def expired(self) -> bool: """Проверка срока жизни токена. Вовзращает истину, если срок действия токена истек. """ if datetime.now() > self.expire: return True return False def __repr__(self): return f"{self.id}: {self.token} | {self.expire}" def auth(func): async def wrapper(message): users_list: Allowed_user = ( session.query(Allowed_user).order_by(Allowed_user.id).all() ) finded_user: Allowed_user = next( (u for u in users_list if u.user_id == message["from"]["id"]), None ) if not finded_user: return # await message.reply("Access Denied", reply=False) return await func(message) return wrapper async def clean_up(message_id: int, delay_min: int): await asyncio.sleep(delay_min * 60) try: await bot.delete_message(chat_id=CHAT_ID, message_id=message_id) except MessageToDeleteNotFound: print("Ошибка удаления сообщения") @dp.message_handler(commands=["start", "help"]) @auth async def send_welcome(message: types.Message): await message.reply( "Бот для отправки новостей\n\n" "/mes для отправки сообщения\n", reply=False ) @dp.message_handler(commands=["mes"]) @auth async def send_to_chanel(message: types.Message): deletion_time = message.date + timedelta(minutes=DELAY_TIME) out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=CHAT_ID, text=out_text, disable_notification=True, parse_mode="Markdown", ) await clean_up(message_id=result.message_id, delay_min=DELAY_TIME) @dp.message_handler(commands=["question"]) @auth async def send_question(message: types.Message): """Отправить вопрос через бота. Через бота можно отправить вопрос, который будет удален через DELAY_TIME_Q минут. """ if len(message.text) == len("/question"): await message.reply("Пишешь пусто - делаешь грустно (мне)", reply=False) return deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q) out_text = f"*Внимание, вопрос!*\n\n{message.text[10:]}\n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=CHAT_ID, text=out_text, disable_notification=True, parse_mode="Markdown", ) await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q) @dp.message_handler(commands=["get_invite"]) async def get_invite(message: types.Message): if message["from"]["id"] not in ADMINS: return token = secrets.token_urlsafe(16) exp = datetime.now() + timedelta(minutes=5) test = Token(token=token, expire=exp) session.add(test) session.commit() await message.reply( f"Твой токен: *{token}* \nБудет действителен 5 минут", reply=False, parse_mode="Markdown", ) @dp.message_handler(commands=["add_me"]) async def add_me(message: types.Message): """Добавление юзера в чат. Когда бот получает токен, он проверяет его на валидность (есть ли такой вообще, не истек ли) и высылает разовую ссылку на вступление в группу. """ user_token = message.text[8:].strip() tokens_list = session.query(Token).order_by(Token.id).all() finded_token: Token = next((t for t in tokens_list if t.token == user_token), None) if finded_token.expired(): print("Пизда токену") # если с токеном все впорядке, то добавляем юзера в базу if finded_token and not finded_token.expired(): new_user: Allowed_user = Allowed_user( user_id=message["from"]["id"], date_add=datetime.now() ) session.add(new_user) session.commit() # генерируем ссылку, истечет через 5 минут, может вступить 1 человек link = await bot.create_chat_invite_link( chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1 ) await message.reply( "Welcome to the internet" "\nHave a look around" "\nAnything that brain of yours can think of can be found" f"\n\nТвоя ссылка для вступления: {link['invite_link']}", reply=False, parse_mode="Markdown", ) if __name__ == "__main__": # инициализация бд Base.metadata.create_all(engine) # print(session.query(Token).order_by(Token.id).all()) # проверить, все ли сообщения удалены executor.start_polling(dp, skip_updates=True)