Бот для телеги, открывающий доступ в чатик по приглашениям.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

287 lines
9.3 KiB

"""Славный модуль с бизнес-логикой."""
from models import Allowed_user, Messages_to_delete, Token
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
import settings
import asyncio
from secrets import token_urlsafe
from datetime import datetime, timedelta
from aiogram.utils.exceptions import MessageToDeleteNotFound
from aiogram import Bot, types
from functools import wraps
# не очень красивый способ создание общего объекта ORM для всех
# функций модуля
engine = create_engine("sqlite:///" + str(settings.DB_PATH))
Session = sessionmaker(bind=engine)
session = Session()
def auth(func):
"""Декоратор для аутентификации.
Проверяем, что у юзера есть полномочия на выполнение действия - он
должен быть в белом списке.
"""
async def wrapper(message):
finded_user: Allowed_user
uid = message["from"]["id"]
finded_user = (
session.query(Allowed_user)
.filter(Allowed_user.user_id == uid)
.one_or_none()
)
if not finded_user:
return
return await func(message)
return wrapper
def only_admins(func):
"""Декоратора аутентификации для администраторов.
Данные действия с ботом могут выполнять только администраторы.
"""
async def wrapper(message):
uid = message["from"]["id"]
if uid not in settings.ADMINS:
return
return await func(message)
return wrapper
async def _clean_up(message_id: int, delay_min: int, bot: Bot) -> None:
"""Поставить таймер на удаление сообщения.
Внутренняя функция для удаления сообщения. Пишет в базу id
сообщения и время, когда его нужно удалить. Ждет `delay_min` и
пробует удалить сообщение.
Parameters
----------
message_id : int
Идентификатор сообщения, присвоенный ему Телеграмом.
delay_min : int
Количество минут, через которое это сообщение нужно удалить.
bot : Bot
Объект бота, через который сообщение будет отправленно в канал.
"""
exp = datetime.now() + timedelta(minutes=delay_min)
msg: Messages_to_delete = Messages_to_delete(
message_id=message_id, deletion_date=exp
)
session.add(msg)
session.commit()
await asyncio.sleep(delay_min * 60)
try:
await bot.delete_message(chat_id=settings.CHAT_ID, message_id=message_id)
session.delete(msg)
session.commit()
except MessageToDeleteNotFound:
print("Ошибка удаления сообщения")
def get_new_token() -> str:
"""Получить токен.
Возвращает токен для приглашения нового пользователя.
Returns
-------
token : str
Токен для доступа к группе. Длина - 16 символов. Длина токена
16 символов, срок действия 5 минут. Токен и его срок годности
записываются в базу.
"""
token = token_urlsafe(16)
exp = datetime.now() + timedelta(minutes=5)
test = Token(token=token, expire=exp)
session.add(test)
session.commit()
return token
def check_token(token: str) -> bool:
"""Проверить валидность токена.
Токен считается валидным если он соотвествует сразу двум условиям:
он есть в базе, срок его действия не истек. После проверки токен
будет удален из базы.
Parameters
----------
token : str
Токен. Длина не проверяется.
Returns
-------
result : bool
Результат проверки токена.
"""
finded_token = session.query(Token).filter(Token.token == token).one_or_none()
if finded_token and not finded_token.expired():
session.delete(finded_token)
session.commit()
return True
return False
def add_new_user_to_allowed_list(user_id: int) -> None:
"""Добавить пользователя в белый список.
Если пользователь в нем уже есть, то игнорирует ошибку и ведет
себя так, словно его добавили впервые.
Parameters
----------
user_id : int
Идентификатор пользователя.
"""
new_user: Allowed_user = Allowed_user(user_id=user_id, date_add=datetime.now())
try:
session.add(new_user)
session.commit()
except IntegrityError:
print(f"Юзер {new_user.user_id} уже в базе")
session.rollback()
def delete_user_from_allowed_list(user_id: str) -> None:
"""Удалить пользователя из белого списка.
Parameters
----------
user_id : int
Идентификатор пользователя.
"""
user: Allowed_user = (
session.query(Allowed_user)
.filter(Allowed_user.user_id == user_id)
.one_or_none()
)
if user:
session.delete(user)
session.commit()
def get_static(name: str) -> str:
"""Считать содержимое файла.
Возвращает содержимое файла. Поддерживает Markdown.
Parameters
----------
name : str
Имя файла вместе с папкой. Рекомендуется размещать файлы с
текстом в папке `/static`.
Returns
-------
msg : str
Содержимое файла, переданное в `name`.
"""
with open(name, "r") as f:
msg = f.read()
return msg
async def delete_old_messages(bot: Bot) -> None:
"""Удалить старые сообщения.
Удаляет сообщения, если их delete_date меньше чем время сейчас.
Если сообщения в чате не найдено - выводит сообщение в лог и
удалет сообщение из базы.
"""
msg_list: Messages_to_delete = (
session.query(Messages_to_delete).order_by(Messages_to_delete.id).all()
)
# если удалять нечего, то выходим
if not msg_list:
return
for m in msg_list:
if not m.deletion_needed():
continue
try:
await bot.delete_message(chat_id=settings.CHAT_ID, message_id=m.message_id)
session.delete(m)
session.commit()
except MessageToDeleteNotFound:
print("Сообщение не найдено!")
session.delete(m)
session.commit()
def get_text_from_command(message: types.Message) -> str:
"""Получить текст сообщения, отправленного боту.
Вернет текст из команды для бота (но не саму команду).
Parameters
----------
message : types.Message
Сообщения, из которого нужно извлечь текст.
Returns
-------
text : str
Текст (тело) сообщения без команды.
Notes
-----
Парсит message["entities"], поэтому в качестве аргумента не нужна
команда, а только объект сообщения.
"""
command_length = int(message["entities"][0]["length"])
return message.text[command_length:].strip()
async def send_message_to_chat(
msg_date: datetime, delete_after: int, text: str, bot: Bot
) -> None:
"""Отправить сообщение в чат.
Отправляет сообщение в чат, ставит таймер на удаление.
Parameters
----------
msg_date : datetime
Время отправки сообщения. На его основе высчитывается время
удаления.
delete_after : int
Количество минут, после которого нужно удалить сообщение.
text : str
Сообщение для отправки в канал.
bot : Bot
Объект бота, через которого будет отправлено сообщение.
"""
deletion_time = msg_date + timedelta(minutes=delete_after)
text += f"\n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=settings.CHAT_ID,
text=text,
disable_notification=True,
parse_mode="Markdown",
)
await _clean_up(message_id=result.message_id, delay_min=delete_after, bot=bot)