import os import logging from aiogram import Bot, Dispatcher, executor, types from aiogram.utils.exceptions import MessageToDeleteNotFound import asyncio from datetime import timedelta, datetime import secrets from sqlalchemy import create_engine from sqlalchemy import Column, Integer, String, DateTime from sqlalchemy.orm import declarative_base, sessionmaker logging.basicConfig(level=logging.INFO) DELAY_TIME = 1 DELAY_TIME_Q = 3 CHAT_ID = -1001612776177 ALLOWED_USERS = [106693654, 7063133] ADMINS = [106693654, 7063133, 4978608] API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) engine = create_engine("sqlite:///support.db") Base = declarative_base() Session = sessionmaker(bind=engine) session = Session() class Allowed_user(Base): __tablename__ = "allowed_users" id = Column(Integer, primary_key=True) user_id = Column(Integer, nullable=False) date_add = Column(DateTime, nullable=False) class Token(Base): """Класс для токенов. Хранит сам ключ, и дату истечения. """ __tablename__ = "tokens" id = Column(Integer, primary_key=True) token = Column(String, nullable=False) expire = Column(DateTime, nullable=False) def expired(self) -> bool: """Проверка срока жизни токена. Вовзращает истину, если срок действия токена истек. """ if datetime.now() > self.expire: return True return False def __repr__(self): """Базовое представление класса.""" return f"{self.id}: {self.token} | {self.expire}" class Messages_to_delete(Base): """Сообщения, которые нужно будет удалить.""" __tablename__ = "messages_to_delete" id = Column(Integer, primary_key=True) message_id = Column(Integer, nullable=False) deletion_date = Column(DateTime, nullable=False) def deletion_needed(self) -> bool: """Нужно ли удалить это сообщение. Мы могли пропустить удаление из-за перезапуска. """ if datetime.now() > self.deletion_date: return True return False def auth(func): async def wrapper(message): users_list: Allowed_user = ( session.query(Allowed_user).order_by(Allowed_user.id).all() ) finded_user: Allowed_user = next( (u for u in users_list if u.user_id == message["from"]["id"]), None ) if not finded_user: return # await message.reply("Access Denied", reply=False) return await func(message) return wrapper async def clean_up(message_id: int, delay_min: int): exp = datetime.now() + timedelta(minutes=delay_min) msg: Messages_to_delete = Messages_to_delete( message_id=message_id, deletion_date=exp ) session.add(msg) session.commit() await asyncio.sleep(delay_min * 60) try: await bot.delete_message(chat_id=CHAT_ID, message_id=message_id) session.delete(msg) session.commit() except MessageToDeleteNotFound: print("Ошибка удаления сообщения") @dp.message_handler(commands=["start", "help"]) @auth async def send_welcome(message: types.Message): await message.reply( "Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n", reply=False, ) @dp.message_handler(commands=["mes"]) @auth async def send_to_chanel(message: types.Message): deletion_time = message.date + timedelta(minutes=DELAY_TIME) out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=CHAT_ID, text=out_text, disable_notification=True, parse_mode="Markdown", ) await clean_up(message_id=result.message_id, delay_min=DELAY_TIME) @dp.message_handler(commands=["q"]) @auth async def send_question(message: types.Message): """Отправить вопрос через бота. Через бота можно отправить вопрос, который будет удален через DELAY_TIME_Q минут. """ if len(message.text) == len("/q"): await message.reply( "Пишешь пусто - делаешь грустно (мне)" "\nПосле /q надо написать вопрос, вот так:" "\n\n/q как доехать до Баррикадной?", reply=False, ) return deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q) out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=CHAT_ID, text=out_text, disable_notification=True, parse_mode="Markdown", ) await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q) @dp.message_handler(commands=["get_invite"]) async def get_invite(message: types.Message): if message["from"]["id"] not in ADMINS: return token = secrets.token_urlsafe(16) exp = datetime.now() + timedelta(minutes=5) test = Token(token=token, expire=exp) session.add(test) session.commit() await message.reply( f"Новый токен: *{token}* \nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту [@GorgorodBot](https://t.me/GorgorodBot)\n`/add_me {token}`", reply=False, parse_mode="Markdown", ) @dp.message_handler(commands=["add_me"]) async def add_me(message: types.Message): """Добавление юзера в чат. Когда бот получает токен, он проверяет его на валидность (есть ли такой вообще, не истек ли) и высылает разовую ссылку на вступление в группу. """ user_token = message.text[8:].strip() tokens_list = session.query(Token).order_by(Token.id).all() finded_token: Token = next((t for t in tokens_list if t.token == user_token), None) if finded_token.expired(): print("Пизда токену") # если с токеном все впорядке, то добавляем юзера в базу if finded_token and not finded_token.expired(): new_user: Allowed_user = Allowed_user( user_id=message["from"]["id"], date_add=datetime.now() ) session.add(new_user) session.commit() # генерируем ссылку, истечет через 5 минут, может вступить 1 человек link = await bot.create_chat_invite_link( chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1 ) await message.reply( "Welcome to the internet" "\nHave a look around" "\nAnything that brain of yours can think of can be found" f"\n\nТвоя ссылка для вступления: {link['invite_link']}", reply=False, parse_mode="Markdown", ) await asyncio.sleep(10) with open("welcome_msg.txt", "r") as f: msg = f.read() await message.reply(msg, reply=False, parse_mode="Markdown") @dp.message_handler(commands=["wlc"]) @auth async def wlc(message: types.Message): with open("welcome_msg.txt", "r") as f: msg = f.read() await message.reply( msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True ) # https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram async def messages_cleanup() -> None: msg_list = session.query(Messages_to_delete).order_by(Messages_to_delete.id).all() while True: for m in msg_list: if m.deletion_needed: try: await bot.delete_message(chat_id=CHAT_ID, message_id=m.message_id) except MessageToDeleteNotFound: print("Ошибка удаления сообщения") await asyncio.sleep(600) async def on_startup(dispatcher: Dispatcher) -> None: asyncio.create_task(messages_cleanup()) if __name__ == "__main__": Base.metadata.create_all(engine) executor.start_polling(dp, skip_updates=True, on_startup=on_startup)