"""Славный модуль с бизнес-логикой.""" from models import Allowed_user, Messages_to_delete, Token from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import IntegrityError import settings import asyncio from secrets import token_urlsafe from datetime import datetime, timedelta from aiogram.utils.exceptions import MessageToDeleteNotFound from aiogram import Bot engine = create_engine("sqlite:///" + str(settings.DB_PATH)) Session = sessionmaker(bind=engine) session = Session() def auth(func): """Аутентификация. Проверяем, что у юзера есть полномочия на выполнение действия - он должен быть в белом списке. """ async def wrapper(message): finded_user: Allowed_user uid = message["from"]["id"] finded_user = ( session.query(Allowed_user) .filter(Allowed_user.user_id == uid) .one_or_none() ) if not finded_user: return return await func(message) return wrapper def only_admins(func): """Действия только для админов.""" async def wrapper(message): uid = message["from"]["id"] if uid not in settings.ADMINS: return return await func(message) return wrapper async def _clean_up(message_id: int, delay_min: int, bot: Bot) -> None: """Удаление сообщенния message_id через delay_min минут.""" exp = datetime.now() + timedelta(minutes=delay_min) msg: Messages_to_delete = Messages_to_delete( message_id=message_id, deletion_date=exp ) session.add(msg) session.commit() await asyncio.sleep(delay_min * 60) try: await bot.delete_message(chat_id=settings.CHAT_ID, message_id=message_id) session.delete(msg) session.commit() except MessageToDeleteNotFound: print("Ошибка удаления сообщения") def get_new_token() -> str: """Возвращает токен для приглашения пользователя. Длина токена 16 символов, срок действия 5 минут. Токен записывается в базу. """ token = token_urlsafe(16) exp = datetime.now() + timedelta(minutes=5) test = Token(token=token, expire=exp) session.add(test) session.commit() return token def check_token(token: str) -> bool: """Проверяет, находится ли токен в базе, возвращает True, если токен есть и не истек. затем удаляет токен из базы.""" finded_token = session.query(Token).filter(Token.token == token).one_or_none() if finded_token and not finded_token.expired(): session.delete(finded_token) session.commit() return True return False def add_new_user_to_allowed_list(user_id: int) -> None: """Добавляет пользователя в белый список. Если пользователь в нем уже есть, то игнорирует ошибку и ведет себя так, словно его добавили впервые.""" new_user: Allowed_user = Allowed_user(user_id=user_id, date_add=datetime.now()) try: session.add(new_user) session.commit() except IntegrityError: print(f"Юзер {new_user.user_id} уже в базе") session.rollback()