import os import logging from aiogram import Bot, Dispatcher, executor, types from aiogram.utils.exceptions import MessageToDeleteNotFound import asyncio from datetime import timedelta, datetime import secrets from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError from models import Allowed_user, Token, Messages_to_delete, Base logging.basicConfig(level=logging.INFO) DELAY_TIME = int(os.getenv("GBOT_DELAY_TIME")) DELAY_TIME_Q = int(os.getenv("GBOT_DELAY_TIME_Q")) CHAT_ID = os.getenv("GBOT_CHAT_ID") ALLOWED_USERS = [106693654, 7063133] ADMINS = [106693654, 7063133, 4978608] API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") DB_PATH = os.getenv("GBOT_DB_PATH") bot = Bot(token=API_TOKEN) dp = Dispatcher(bot) engine = create_engine("sqlite:///" + str(DB_PATH)) Session = sessionmaker(bind=engine) session = Session() def auth(func): async def wrapper(message): users_list: Allowed_user = ( session.query(Allowed_user).order_by(Allowed_user.id).all() ) finded_user: Allowed_user = next( (u for u in users_list if u.user_id == message["from"]["id"]), None ) if not finded_user: return return await func(message) return wrapper async def clean_up(message_id: int, delay_min: int): exp = datetime.now() + timedelta(minutes=delay_min) msg: Messages_to_delete = Messages_to_delete( message_id=message_id, deletion_date=exp ) session.add(msg) session.commit() await asyncio.sleep(delay_min * 60) try: await bot.delete_message(chat_id=CHAT_ID, message_id=message_id) session.delete(msg) session.commit() except MessageToDeleteNotFound: print("Ошибка удаления сообщения") @dp.message_handler(commands=["start", "help"]) @auth async def send_welcome(message: types.Message): await message.reply( "Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n", reply=False, ) @dp.message_handler(commands=["channel_id"]) async def get_channel_id(message: types.Message) -> None: """Возвращает id канала по его имени""" if message["from"]["id"] not in ADMINS: return c_id = message.text[12:] result = await bot.send_message( chat_id=c_id, text="ping", disable_notification=True, ) await message.reply( f"Ид чата {c_id}: *{result.sender_chat.id}*", parse_mode="Markdown" ) @dp.message_handler(commands=["mes"]) @auth async def send_to_chanel(message: types.Message): deletion_time = message.date + timedelta(minutes=DELAY_TIME) out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=CHAT_ID, text=out_text, disable_notification=True, parse_mode="Markdown", ) print(result.sender_chat.id) await clean_up(message_id=result.message_id, delay_min=DELAY_TIME) @dp.message_handler(commands=["q"]) @auth async def send_question(message: types.Message): """Отправить вопрос через бота. Через бота можно отправить вопрос, который будет удален через DELAY_TIME_Q минут. """ if len(message.text) == len("/q"): await message.reply( "Пишешь пусто - делаешь грустно (мне)" "\nПосле /q надо написать вопрос, вот так:" "\n\n/q как доехать до Баррикадной?", reply=False, ) return deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q) out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=CHAT_ID, text=out_text, disable_notification=True, parse_mode="Markdown", ) await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q) @dp.message_handler(commands=["get_invite"]) async def get_invite(message: types.Message): if message["from"]["id"] not in ADMINS: return token = secrets.token_urlsafe(16) exp = datetime.now() + timedelta(minutes=5) test = Token(token=token, expire=exp) session.add(test) session.commit() await message.reply( f"Новый токен: *{token}* \nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту [@GorgorodBot](https://t.me/GorgorodBot)\n`/add_me {token}`", reply=False, parse_mode="Markdown", ) @dp.message_handler(commands=["add_me"]) async def add_me(message: types.Message): """Добавление юзера в чат. Когда бот получает токен, он проверяет его на валидность (есть ли такой вообще, не истек ли) и высылает разовую ссылку на вступление в группу. """ user_token = message.text[8:].strip() tokens_list = session.query(Token).order_by(Token.id).all() finded_token: Token = next((t for t in tokens_list if t.token == user_token), None) # если с токеном все впорядке, то добавляем юзера в базу if finded_token and not finded_token.expired(): new_user: Allowed_user = Allowed_user( user_id=message["from"]["id"], date_add=datetime.now() ) try: session.add(new_user) session.delete(finded_token) session.commit() except IntegrityError: print(f"Юзер {new_user.user_id} уже в базе") session.rollback() # генерируем ссылку, истечет через 5 минут, может вступить 1 человек link = await bot.create_chat_invite_link( chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1 ) await message.reply( "Welcome to the internet" "\nHave a look around" "\nAnything that brain of yours can think of can be found" f"\n\nТвоя ссылка для вступления: {link['invite_link']}", reply=False, parse_mode="Markdown", ) await asyncio.sleep(10) with open("static/welcome_msg.txt", "r") as f: msg = f.read() await message.reply(msg, reply=False, parse_mode="Markdown") @dp.message_handler(commands=["stop"]) @auth async def stop_and_panic(message: types.Message) -> None: """Panic-button для бота. При нажатии: - кидает юзера в чс чата и удаляет его, - удаляет из белого списка. """ u_id = message["from"]["id"] finded_user: Allowed_user = ( session.query(Allowed_user).filter(Allowed_user.user_id == u_id).first() ) await bot.kick_chat_member( chat_id=CHAT_ID, user_id=finded_user.user_id, revoke_messages=True ) session.delete(finded_user) session.commit() # рассылка сообщения админам что юзер нажал паническую кнопку. Нужны ид чатов. # for admin_id in ADMINS: # await bot.send_meassage() @dp.message_handler(commands=["wlc"]) @auth async def wlc(message: types.Message): with open("welcome_msg.txt", "r") as f: msg = f.read() await message.reply( msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True ) # https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram async def messages_cleanup() -> None: msg_list = session.query(Messages_to_delete).order_by(Messages_to_delete.id).all() while True: for m in msg_list: if m.deletion_needed: try: await bot.delete_message(chat_id=CHAT_ID, message_id=m.message_id) session.delete(m) session.commit() except MessageToDeleteNotFound: print("Сообщение не найдено!") session.delete(m) session.commit() await asyncio.sleep(600) async def on_startup(dispatcher: Dispatcher) -> None: asyncio.create_task(messages_cleanup()) if __name__ == "__main__": Base.metadata.create_all(engine) executor.start_polling(dp, skip_updates=True, on_startup=on_startup)