|
|
|
"""Главный модуль программы."""
|
|
|
|
import logging
|
|
|
|
from aiogram import Bot, Dispatcher, executor, types
|
|
|
|
from aiogram.utils.exceptions import MessageToDeleteNotFound
|
|
|
|
import asyncio
|
|
|
|
from datetime import timedelta
|
|
|
|
from models import Allowed_user, Messages_to_delete, Base
|
|
|
|
from aiogram.utils.markdown import escape_md
|
|
|
|
import settings
|
|
|
|
import services
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
|
|
|
|
|
|
bot = Bot(token=settings.API_TOKEN)
|
|
|
|
dp = Dispatcher(bot)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["start", "help"])
|
|
|
|
@services.auth
|
|
|
|
async def send_welcome(message: types.Message):
|
|
|
|
"""Справка по командам."""
|
|
|
|
await message.reply(
|
|
|
|
"Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n",
|
|
|
|
reply=False,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["channel_id"])
|
|
|
|
@services.only_admins
|
|
|
|
async def get_channel_id(message: types.Message) -> None:
|
|
|
|
"""Возвращает id канала по его имени."""
|
|
|
|
c_id = message.text[12:]
|
|
|
|
|
|
|
|
result = await bot.send_message(
|
|
|
|
chat_id=c_id,
|
|
|
|
text="ping",
|
|
|
|
disable_notification=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
await message.reply(
|
|
|
|
f"Ид чата {c_id}: *{result.sender_chat.id}*", parse_mode="Markdown"
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["q"])
|
|
|
|
@services.auth
|
|
|
|
async def send_question(message: types.Message) -> None:
|
|
|
|
"""Отправить вопрос через бота.
|
|
|
|
|
|
|
|
Через бота можно отправить вопрос, который будет удален через
|
|
|
|
DELAY_TIME_Q минут.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if len(message.text) == len("/q"):
|
|
|
|
await message.reply(
|
|
|
|
services.get_static("static/short_question_error.txt"),
|
|
|
|
reply=False,
|
|
|
|
parse_mode="Markdown",
|
|
|
|
)
|
|
|
|
return
|
|
|
|
|
|
|
|
deletion_time = message.date + timedelta(minutes=settings.DELAY_TIME_Q)
|
|
|
|
out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*"
|
|
|
|
result = await bot.send_message(
|
|
|
|
chat_id=settings.CHAT_ID,
|
|
|
|
text=out_text,
|
|
|
|
disable_notification=True,
|
|
|
|
parse_mode="Markdown",
|
|
|
|
)
|
|
|
|
|
|
|
|
await services._clean_up(
|
|
|
|
message_id=result.message_id, delay_min=settings.DELAY_TIME_Q, bot=bot
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["get_invite"])
|
|
|
|
@services.only_admins
|
|
|
|
async def get_invite(message: types.Message) -> None:
|
|
|
|
"""Получить токен для нового пользователя."""
|
|
|
|
bot_info = await bot.get_me()
|
|
|
|
bot_link = f"@{bot_info.username}"
|
|
|
|
|
|
|
|
token = services.get_new_token()
|
|
|
|
|
|
|
|
msg = f"Новый токен: *{token}*\nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту {escape_md(bot_link)} \n`/add_me {token}`"
|
|
|
|
|
|
|
|
await message.reply(
|
|
|
|
msg,
|
|
|
|
reply=False,
|
|
|
|
parse_mode="Markdown",
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["add_me"])
|
|
|
|
async def add_me(message: types.Message) -> None:
|
|
|
|
"""Добавление юзера в чат.
|
|
|
|
|
|
|
|
Когда бот получает токен, он проверяет его на валидность (есть ли
|
|
|
|
такой вообще, не истек ли) и высылает разовую ссылку на вступление
|
|
|
|
в группу.
|
|
|
|
|
|
|
|
"""
|
|
|
|
user_token = message.text[8:].strip()
|
|
|
|
|
|
|
|
if not services.check_token(user_token):
|
|
|
|
return
|
|
|
|
|
|
|
|
services.add_new_user_to_allowed_list(message["from"]["id"])
|
|
|
|
|
|
|
|
# генерируем ссылку, истечет через 5 минут, может вступить 1 человек
|
|
|
|
link = await bot.create_chat_invite_link(
|
|
|
|
chat_id=settings.CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1
|
|
|
|
)
|
|
|
|
|
|
|
|
msg = services.get_static("static/welcome_msg_short.txt")
|
|
|
|
await message.reply(
|
|
|
|
msg.format(invite=link["invite_link"]), reply=False, parse_mode="Markdown"
|
|
|
|
)
|
|
|
|
|
|
|
|
await asyncio.sleep(10)
|
|
|
|
|
|
|
|
msg = services.get_static("static/welcome_msg.txt")
|
|
|
|
await message.reply(
|
|
|
|
msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["stop"])
|
|
|
|
@services.auth
|
|
|
|
async def stop_and_panic(message: types.Message) -> None:
|
|
|
|
"""Panic-button для бота.
|
|
|
|
|
|
|
|
При нажатии:
|
|
|
|
- кидает юзера в чс чата и удаляет его,
|
|
|
|
- удаляет из белого списка.
|
|
|
|
"""
|
|
|
|
u_id = message["from"]["id"]
|
|
|
|
services.delete_user_from_allowed_list(u_id)
|
|
|
|
await bot.kick_chat_member(
|
|
|
|
chat_id=settings.CHAT_ID, user_id=u_id, revoke_messages=True
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@dp.message_handler(commands=["wlc"])
|
|
|
|
@services.only_admins
|
|
|
|
async def wlc(message: types.Message) -> None:
|
|
|
|
"""Тестирование приветственного сообщения."""
|
|
|
|
await message.reply(
|
|
|
|
services.get_static("static/welcome_msg.txt"),
|
|
|
|
reply=False,
|
|
|
|
parse_mode="Markdown",
|
|
|
|
disable_web_page_preview=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram
|
|
|
|
async def messages_cleanup() -> None:
|
|
|
|
"""Зачистка пропущенных сообщений. При запуске бота проверяет, есть ли не удаленные сообщения требующие удаления, удаляет их."""
|
|
|
|
while True:
|
|
|
|
msg_list = (
|
|
|
|
services.session.query(Messages_to_delete)
|
|
|
|
.order_by(Messages_to_delete.id)
|
|
|
|
.all()
|
|
|
|
)
|
|
|
|
if msg_list:
|
|
|
|
for m in msg_list:
|
|
|
|
print(m.deletion_needed())
|
|
|
|
if m.deletion_needed():
|
|
|
|
try:
|
|
|
|
await bot.delete_message(
|
|
|
|
chat_id=settings.CHAT_ID, message_id=m.message_id
|
|
|
|
)
|
|
|
|
services.session.delete(m)
|
|
|
|
services.session.commit()
|
|
|
|
except MessageToDeleteNotFound:
|
|
|
|
print("Сообщение не найдено!")
|
|
|
|
services.session.delete(m)
|
|
|
|
services.session.commit()
|
|
|
|
await asyncio.sleep(600)
|
|
|
|
|
|
|
|
|
|
|
|
async def on_startup(dispatcher: Dispatcher) -> None:
|
|
|
|
asyncio.create_task(messages_cleanup())
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
Base.metadata.create_all(services.engine)
|
|
|
|
executor.start_polling(dp, skip_updates=True, on_startup=on_startup)
|