"""Славный модуль с бизнес-логикой.""" from models import Allowed_user, Messages_to_delete, Token from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError import settings import asyncio from secrets import token_urlsafe from datetime import datetime, timedelta from aiogram.utils.exceptions import MessageToDeleteNotFound from aiogram import Bot, types from functools import wraps # не очень красивый способ создание общего объекта ORM для всех # функций модуля engine = create_engine("sqlite:///" + str(settings.DB_PATH)) Session = sessionmaker(bind=engine) session = Session() def auth(func): """Декоратор для аутентификации. Проверяем, что у юзера есть полномочия на выполнение действия - он должен быть в белом списке. """ async def wrapper(message): finded_user: Allowed_user uid = message["from"]["id"] finded_user = ( session.query(Allowed_user) .filter(Allowed_user.user_id == uid) .one_or_none() ) if not finded_user: return return await func(message) return wrapper def only_admins(func): """Декоратора аутентификации для администраторов. Данные действия с ботом могут выполнять только администраторы. """ async def wrapper(message): uid = message["from"]["id"] if uid not in settings.ADMINS: return return await func(message) return wrapper async def _clean_up(message_id: int, delay_min: int, bot: Bot) -> None: """Поставить таймер на удаление сообщения. Внутренняя функция для удаления сообщения. Пишет в базу id сообщения и время, когда его нужно удалить. Ждет `delay_min` и пробует удалить сообщение. Parameters ---------- message_id : int Идентификатор сообщения, присвоенный ему Телеграмом. delay_min : int Количество минут, через которое это сообщение нужно удалить. bot : Bot Объект бота, через который сообщение будет отправленно в канал. """ exp = datetime.now() + timedelta(minutes=delay_min) msg: Messages_to_delete = Messages_to_delete( message_id=message_id, deletion_date=exp ) session.add(msg) session.commit() await asyncio.sleep(delay_min * 60) try: await bot.delete_message(chat_id=settings.CHAT_ID, message_id=message_id) session.delete(msg) session.commit() except MessageToDeleteNotFound: print("Ошибка удаления сообщения") def get_new_token() -> str: """Получить токен. Возвращает токен для приглашения нового пользователя. Returns ------- token : str Токен для доступа к группе. Длина - 16 символов. Длина токена 16 символов, срок действия 5 минут. Токен и его срок годности записываются в базу. """ token = token_urlsafe(16) exp = datetime.now() + timedelta(minutes=5) test = Token(token=token, expire=exp) session.add(test) session.commit() return token def check_token(token: str) -> bool: """Проверить валидность токена. Токен считается валидным если он соотвествует сразу двум условиям: он есть в базе, срок его действия не истек. После проверки токен будет удален из базы. Parameters ---------- token : str Токен. Длина не проверяется. Returns ------- result : bool Результат проверки токена. """ finded_token = session.query(Token).filter(Token.token == token).one_or_none() if finded_token and not finded_token.expired(): session.delete(finded_token) session.commit() return True return False def add_new_user_to_allowed_list(user_id: int) -> None: """Добавить пользователя в белый список. Если пользователь в нем уже есть, то игнорирует ошибку и ведет себя так, словно его добавили впервые. Parameters ---------- user_id : int Идентификатор пользователя. """ new_user: Allowed_user = Allowed_user(user_id=user_id, date_add=datetime.now()) try: session.add(new_user) session.commit() except IntegrityError: print(f"Юзер {new_user.user_id} уже в базе") session.rollback() def delete_user_from_allowed_list(user_id: str) -> None: """Удалить пользователя из белого списка. Parameters ---------- user_id : int Идентификатор пользователя. """ user: Allowed_user = ( session.query(Allowed_user) .filter(Allowed_user.user_id == user_id) .one_or_none() ) if user: session.delete(user) session.commit() def get_static(name: str) -> str: """Считать содержимое файла. Возвращает содержимое файла. Поддерживает Markdown. Parameters ---------- name : str Имя файла вместе с папкой. Рекомендуется размещать файлы с текстом в папке `/static`. Returns ------- msg : str Содержимое файла, переданное в `name`. """ with open(name, "r") as f: msg = f.read() return msg async def delete_old_messages(bot: Bot) -> None: """Удалить старые сообщения. Удаляет сообщения, если их delete_date меньше чем время сейчас. Если сообщения в чате не найдено - выводит сообщение в лог и удалет сообщение из базы. """ msg_list: Messages_to_delete = ( session.query(Messages_to_delete).order_by(Messages_to_delete.id).all() ) # если удалять нечего, то выходим if not msg_list: return for m in msg_list: if not m.deletion_needed(): continue try: await bot.delete_message(chat_id=settings.CHAT_ID, message_id=m.message_id) session.delete(m) session.commit() except MessageToDeleteNotFound: print("Сообщение не найдено!") session.delete(m) session.commit() def get_text_from_command(message: types.Message) -> str: """Получить текст сообщения, отправленного боту. Вернет текст из команды для бота (но не саму команду). Parameters ---------- message : types.Message Сообщения, из которого нужно извлечь текст. Returns ------- text : str Текст (тело) сообщения без команды. Notes ----- Парсит message["entities"], поэтому в качестве аргумента не нужна команда, а только объект сообщения. """ command_length = int(message["entities"][0]["length"]) return message.text[command_length:].strip() async def send_message_to_chat( msg_date: datetime, delete_after: int, text: str, bot: Bot ) -> None: """Отправить сообщение в чат. Отправляет сообщение в чат, ставит таймер на удаление. Parameters ---------- msg_date : datetime Время отправки сообщения. На его основе высчитывается время удаления. delete_after : int Количество минут, после которого нужно удалить сообщение. text : str Сообщение для отправки в канал. bot : Bot Объект бота, через которого будет отправлено сообщение. """ deletion_time = msg_date + timedelta(minutes=delete_after) text += f"\n\nБудет удалено в *{deletion_time}*" result = await bot.send_message( chat_id=settings.CHAT_ID, text=text, disable_notification=True, parse_mode="Markdown", ) await _clean_up(message_id=result.message_id, delay_min=delete_after, bot=bot)