You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
287 lines
9.3 KiB
287 lines
9.3 KiB
"""Славный модуль с бизнес-логикой.""" |
|
from models import Allowed_user, Messages_to_delete, Token |
|
|
|
from sqlalchemy import create_engine |
|
from sqlalchemy.orm import sessionmaker |
|
from sqlalchemy.exc import IntegrityError |
|
|
|
import settings |
|
import asyncio |
|
from secrets import token_urlsafe |
|
|
|
from datetime import datetime, timedelta |
|
|
|
from aiogram.utils.exceptions import MessageToDeleteNotFound |
|
from aiogram import Bot, types |
|
|
|
from functools import wraps |
|
|
|
# не очень красивый способ создание общего объекта ORM для всех |
|
# функций модуля |
|
engine = create_engine("sqlite:///" + str(settings.DB_PATH)) |
|
Session = sessionmaker(bind=engine) |
|
session = Session() |
|
|
|
|
|
def auth(func): |
|
"""Декоратор для аутентификации. |
|
|
|
Проверяем, что у юзера есть полномочия на выполнение действия - он |
|
должен быть в белом списке. |
|
""" |
|
|
|
async def wrapper(message): |
|
finded_user: Allowed_user |
|
uid = message["from"]["id"] |
|
|
|
finded_user = ( |
|
session.query(Allowed_user) |
|
.filter(Allowed_user.user_id == uid) |
|
.one_or_none() |
|
) |
|
|
|
if not finded_user: |
|
return |
|
return await func(message) |
|
|
|
return wrapper |
|
|
|
|
|
def only_admins(func): |
|
"""Декоратора аутентификации для администраторов. |
|
|
|
Данные действия с ботом могут выполнять только администраторы. |
|
""" |
|
|
|
async def wrapper(message): |
|
uid = message["from"]["id"] |
|
|
|
if uid not in settings.ADMINS: |
|
return |
|
|
|
return await func(message) |
|
|
|
return wrapper |
|
|
|
|
|
async def _clean_up(message_id: int, delay_min: int, bot: Bot) -> None: |
|
"""Поставить таймер на удаление сообщения. |
|
|
|
Внутренняя функция для удаления сообщения. Пишет в базу id |
|
сообщения и время, когда его нужно удалить. Ждет `delay_min` и |
|
пробует удалить сообщение. |
|
|
|
Parameters |
|
---------- |
|
message_id : int |
|
Идентификатор сообщения, присвоенный ему Телеграмом. |
|
delay_min : int |
|
Количество минут, через которое это сообщение нужно удалить. |
|
bot : Bot |
|
Объект бота, через который сообщение будет отправленно в канал. |
|
""" |
|
exp = datetime.now() + timedelta(minutes=delay_min) |
|
msg: Messages_to_delete = Messages_to_delete( |
|
message_id=message_id, deletion_date=exp |
|
) |
|
session.add(msg) |
|
session.commit() |
|
|
|
await asyncio.sleep(delay_min * 60) |
|
|
|
try: |
|
await bot.delete_message(chat_id=settings.CHAT_ID, message_id=message_id) |
|
session.delete(msg) |
|
session.commit() |
|
except MessageToDeleteNotFound: |
|
print("Ошибка удаления сообщения") |
|
|
|
|
|
def get_new_token() -> str: |
|
"""Получить токен. |
|
|
|
Возвращает токен для приглашения нового пользователя. |
|
|
|
Returns |
|
------- |
|
token : str |
|
Токен для доступа к группе. Длина - 16 символов. Длина токена |
|
16 символов, срок действия 5 минут. Токен и его срок годности |
|
записываются в базу. |
|
""" |
|
token = token_urlsafe(16) |
|
exp = datetime.now() + timedelta(minutes=5) |
|
test = Token(token=token, expire=exp) |
|
session.add(test) |
|
session.commit() |
|
|
|
return token |
|
|
|
|
|
def check_token(token: str) -> bool: |
|
"""Проверить валидность токена. |
|
|
|
Токен считается валидным если он соотвествует сразу двум условиям: |
|
он есть в базе, срок его действия не истек. После проверки токен |
|
будет удален из базы. |
|
|
|
Parameters |
|
---------- |
|
token : str |
|
Токен. Длина не проверяется. |
|
|
|
Returns |
|
------- |
|
result : bool |
|
Результат проверки токена. |
|
""" |
|
finded_token = session.query(Token).filter(Token.token == token).one_or_none() |
|
|
|
if finded_token and not finded_token.expired(): |
|
session.delete(finded_token) |
|
session.commit() |
|
return True |
|
|
|
return False |
|
|
|
|
|
def add_new_user_to_allowed_list(user_id: int) -> None: |
|
"""Добавить пользователя в белый список. |
|
|
|
Если пользователь в нем уже есть, то игнорирует ошибку и ведет |
|
себя так, словно его добавили впервые. |
|
|
|
Parameters |
|
---------- |
|
user_id : int |
|
Идентификатор пользователя. |
|
""" |
|
new_user: Allowed_user = Allowed_user(user_id=user_id, date_add=datetime.now()) |
|
|
|
try: |
|
session.add(new_user) |
|
session.commit() |
|
except IntegrityError: |
|
print(f"Юзер {new_user.user_id} уже в базе") |
|
session.rollback() |
|
|
|
|
|
def delete_user_from_allowed_list(user_id: str) -> None: |
|
"""Удалить пользователя из белого списка. |
|
|
|
Parameters |
|
---------- |
|
user_id : int |
|
Идентификатор пользователя. |
|
""" |
|
user: Allowed_user = ( |
|
session.query(Allowed_user) |
|
.filter(Allowed_user.user_id == user_id) |
|
.one_or_none() |
|
) |
|
if user: |
|
session.delete(user) |
|
session.commit() |
|
|
|
|
|
def get_static(name: str) -> str: |
|
"""Считать содержимое файла. |
|
|
|
Возвращает содержимое файла. Поддерживает Markdown. |
|
|
|
Parameters |
|
---------- |
|
name : str |
|
Имя файла вместе с папкой. Рекомендуется размещать файлы с |
|
текстом в папке `/static`. |
|
|
|
Returns |
|
------- |
|
msg : str |
|
Содержимое файла, переданное в `name`. |
|
""" |
|
with open(name, "r") as f: |
|
msg = f.read() |
|
return msg |
|
|
|
|
|
async def delete_old_messages(bot: Bot) -> None: |
|
"""Удалить старые сообщения. |
|
|
|
Удаляет сообщения, если их delete_date меньше чем время сейчас. |
|
Если сообщения в чате не найдено - выводит сообщение в лог и |
|
удалет сообщение из базы. |
|
""" |
|
msg_list: Messages_to_delete = ( |
|
session.query(Messages_to_delete).order_by(Messages_to_delete.id).all() |
|
) |
|
# если удалять нечего, то выходим |
|
if not msg_list: |
|
return |
|
|
|
for m in msg_list: |
|
if not m.deletion_needed(): |
|
continue |
|
try: |
|
await bot.delete_message(chat_id=settings.CHAT_ID, message_id=m.message_id) |
|
session.delete(m) |
|
session.commit() |
|
except MessageToDeleteNotFound: |
|
print("Сообщение не найдено!") |
|
session.delete(m) |
|
session.commit() |
|
|
|
|
|
def get_text_from_command(message: types.Message) -> str: |
|
"""Получить текст сообщения, отправленного боту. |
|
|
|
Вернет текст из команды для бота (но не саму команду). |
|
|
|
Parameters |
|
---------- |
|
message : types.Message |
|
Сообщения, из которого нужно извлечь текст. |
|
|
|
Returns |
|
------- |
|
text : str |
|
Текст (тело) сообщения без команды. |
|
|
|
Notes |
|
----- |
|
Парсит message["entities"], поэтому в качестве аргумента не нужна |
|
команда, а только объект сообщения. |
|
""" |
|
command_length = int(message["entities"][0]["length"]) |
|
return message.text[command_length:].strip() |
|
|
|
|
|
async def send_message_to_chat( |
|
msg_date: datetime, delete_after: int, text: str, bot: Bot |
|
) -> None: |
|
"""Отправить сообщение в чат. |
|
|
|
Отправляет сообщение в чат, ставит таймер на удаление. |
|
|
|
Parameters |
|
---------- |
|
msg_date : datetime |
|
Время отправки сообщения. На его основе высчитывается время |
|
удаления. |
|
delete_after : int |
|
Количество минут, после которого нужно удалить сообщение. |
|
text : str |
|
Сообщение для отправки в канал. |
|
bot : Bot |
|
Объект бота, через которого будет отправлено сообщение. |
|
""" |
|
deletion_time = msg_date + timedelta(minutes=delete_after) |
|
text += f"\n\nБудет удалено в *{deletion_time}*" |
|
|
|
result = await bot.send_message( |
|
chat_id=settings.CHAT_ID, |
|
text=text, |
|
disable_notification=True, |
|
parse_mode="Markdown", |
|
) |
|
await _clean_up(message_id=result.message_id, delay_min=delete_after, bot=bot)
|
|
|