You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
289 lines
9.3 KiB
289 lines
9.3 KiB
import os |
|
import logging |
|
from aiogram import Bot, Dispatcher, executor, types |
|
from aiogram.utils.exceptions import MessageToDeleteNotFound |
|
import asyncio |
|
from datetime import timedelta, datetime |
|
import secrets |
|
from sqlalchemy import create_engine |
|
from sqlalchemy.orm import sessionmaker |
|
from sqlalchemy.exc import IntegrityError |
|
from models import Allowed_user, Token, Messages_to_delete, Base |
|
from aiogram.utils.markdown import escape_md |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
DELAY_TIME = int(os.getenv("GBOT_DELAY_TIME")) |
|
DELAY_TIME_Q = int(os.getenv("GBOT_DELAY_TIME_Q")) |
|
CHAT_ID = os.getenv("GBOT_CHAT_ID") |
|
ALLOWED_USERS = [106693654, 7063133] |
|
ADMINS = [106693654, 7063133, 4978608] |
|
API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") |
|
DB_PATH = os.getenv("GBOT_DB_PATH") |
|
bot = Bot(token=API_TOKEN) |
|
dp = Dispatcher(bot) |
|
|
|
engine = create_engine("sqlite:///" + str(DB_PATH)) |
|
Session = sessionmaker(bind=engine) |
|
session = Session() |
|
|
|
|
|
def auth(func): |
|
"""Аутентификация. |
|
|
|
Проверяем, что у юзера есть полномочия на выполнение действия - он |
|
должен быть в белом списке. |
|
|
|
""" |
|
|
|
async def wrapper(message): |
|
finded_user: Allowed_user |
|
uid = message["from"]["id"] |
|
|
|
finded_user = ( |
|
session.query(Allowed_user) |
|
.filter(Allowed_user.user_id == uid) |
|
.one_or_none() |
|
) |
|
|
|
if not finded_user: |
|
return |
|
return await func(message) |
|
|
|
return wrapper |
|
|
|
|
|
def only_admins(func): |
|
"""Действия только для админов.""" |
|
|
|
async def wrapper(message): |
|
uid = message["from"]["id"] |
|
|
|
if uid not in ADMINS: |
|
return |
|
|
|
return await func(message) |
|
|
|
return wrapper |
|
|
|
|
|
async def clean_up(message_id: int, delay_min: int): |
|
exp = datetime.now() + timedelta(minutes=delay_min) |
|
msg: Messages_to_delete = Messages_to_delete( |
|
message_id=message_id, deletion_date=exp |
|
) |
|
session.add(msg) |
|
session.commit() |
|
|
|
await asyncio.sleep(delay_min * 60) |
|
try: |
|
await bot.delete_message(chat_id=CHAT_ID, message_id=message_id) |
|
session.delete(msg) |
|
session.commit() |
|
except MessageToDeleteNotFound: |
|
print("Ошибка удаления сообщения") |
|
|
|
|
|
@dp.message_handler(commands=["start", "help"]) |
|
@auth |
|
async def send_welcome(message: types.Message): |
|
"""Справка по командам.""" |
|
await message.reply( |
|
"Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n", |
|
reply=False, |
|
) |
|
|
|
|
|
@dp.message_handler(commands=["channel_id"]) |
|
@only_admins |
|
async def get_channel_id(message: types.Message) -> None: |
|
"""Возвращает id канала по его имени.""" |
|
if message["from"]["id"] not in ADMINS: |
|
return |
|
|
|
c_id = message.text[12:] |
|
|
|
result = await bot.send_message( |
|
chat_id=c_id, |
|
text="ping", |
|
disable_notification=True, |
|
) |
|
|
|
await message.reply( |
|
f"Ид чата {c_id}: *{result.sender_chat.id}*", parse_mode="Markdown" |
|
) |
|
|
|
|
|
@dp.message_handler(commands=["mes"]) |
|
@auth |
|
async def send_to_chanel(message: types.Message) -> None: |
|
|
|
deletion_time = message.date + timedelta(minutes=DELAY_TIME) |
|
out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*" |
|
result = await bot.send_message( |
|
chat_id=CHAT_ID, |
|
text=out_text, |
|
disable_notification=True, |
|
parse_mode="Markdown", |
|
) |
|
print(result.sender_chat.id) |
|
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME) |
|
|
|
|
|
@dp.message_handler(commands=["q"]) |
|
@auth |
|
async def send_question(message: types.Message) -> None: |
|
"""Отправить вопрос через бота. |
|
|
|
Через бота можно отправить вопрос, который будет удален через |
|
DELAY_TIME_Q минут. |
|
|
|
""" |
|
if len(message.text) == len("/q"): |
|
await message.reply( |
|
"Пишешь пусто - делаешь грустно (мне)" |
|
"\nПосле /q надо написать вопрос, вот так:" |
|
"\n\n`/q как доехать до Баррикадной?`", |
|
reply=False, |
|
parse_mode="Markdown", |
|
) |
|
return |
|
|
|
deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q) |
|
out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*" |
|
result = await bot.send_message( |
|
chat_id=CHAT_ID, |
|
text=out_text, |
|
disable_notification=True, |
|
parse_mode="Markdown", |
|
) |
|
|
|
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q) |
|
|
|
|
|
@dp.message_handler(commands=["get_invite"]) |
|
@only_admins |
|
async def get_invite(message: types.Message) -> None: |
|
|
|
token = secrets.token_urlsafe(16) |
|
|
|
bot_info = await bot.get_me() |
|
bot_link = "@" + str(bot_info.username) |
|
|
|
exp = datetime.now() + timedelta(minutes=5) |
|
test = Token(token=token, expire=exp) |
|
session.add(test) |
|
session.commit() |
|
msg = f"Новый токен: *{token}*\nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту {escape_md(bot_link)} \n`/add_me {token}`" |
|
|
|
await message.reply( |
|
msg, |
|
reply=False, |
|
parse_mode="Markdown", |
|
) |
|
|
|
|
|
@dp.message_handler(commands=["add_me"]) |
|
async def add_me(message: types.Message) -> None: |
|
"""Добавление юзера в чат. |
|
|
|
Когда бот получает токен, он проверяет его на валидность (есть ли |
|
такой вообще, не истек ли) и высылает разовую ссылку на вступление |
|
в группу. |
|
|
|
""" |
|
user_token = message.text[8:].strip() |
|
tokens_list = session.query(Token).order_by(Token.id).all() |
|
|
|
finded_token: Token = next((t for t in tokens_list if t.token == user_token), None) |
|
|
|
# если с токеном все впорядке, то добавляем юзера в базу |
|
if finded_token and not finded_token.expired(): |
|
new_user: Allowed_user = Allowed_user( |
|
user_id=message["from"]["id"], date_add=datetime.now() |
|
) |
|
try: |
|
session.add(new_user) |
|
session.delete(finded_token) |
|
session.commit() |
|
except IntegrityError: |
|
print(f"Юзер {new_user.user_id} уже в базе") |
|
session.rollback() |
|
|
|
# генерируем ссылку, истечет через 5 минут, может вступить 1 человек |
|
link = await bot.create_chat_invite_link( |
|
chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1 |
|
) |
|
|
|
await message.reply( |
|
"Welcome to the internet" |
|
"\nHave a look around" |
|
"\nAnything that brain of yours can think of can be found" |
|
f"\n\nТвоя ссылка для вступления: {link['invite_link']}", |
|
reply=False, |
|
parse_mode="Markdown", |
|
) |
|
await asyncio.sleep(10) |
|
with open("static/welcome_msg.txt", "r") as f: |
|
msg = f.read() |
|
await message.reply(msg, reply=False, parse_mode="Markdown") |
|
|
|
|
|
@dp.message_handler(commands=["stop"]) |
|
@auth |
|
async def stop_and_panic(message: types.Message) -> None: |
|
"""Panic-button для бота. |
|
|
|
При нажатии: |
|
- кидает юзера в чс чата и удаляет его, |
|
- удаляет из белого списка. |
|
""" |
|
u_id = message["from"]["id"] |
|
finded_user: Allowed_user = ( |
|
session.query(Allowed_user).filter(Allowed_user.user_id == u_id).one_or_none() |
|
) |
|
await bot.kick_chat_member( |
|
chat_id=CHAT_ID, user_id=finded_user.user_id, revoke_messages=True |
|
) |
|
session.delete(finded_user) |
|
session.commit() |
|
|
|
# рассылка сообщения админам что юзер нажал паническую кнопку. Нужны ид чатов. |
|
# for admin_id in ADMINS: |
|
# await bot.send_meassage() |
|
|
|
|
|
@dp.message_handler(commands=["wlc"]) |
|
@auth |
|
async def wlc(message: types.Message) -> None: |
|
with open("welcome_msg.txt", "r") as f: |
|
msg = f.read() |
|
await message.reply( |
|
msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True |
|
) |
|
|
|
|
|
# https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram |
|
async def messages_cleanup() -> None: |
|
msg_list = session.query(Messages_to_delete).order_by(Messages_to_delete.id).all() |
|
while True: |
|
for m in msg_list: |
|
if m.deletion_needed: |
|
try: |
|
await bot.delete_message(chat_id=CHAT_ID, message_id=m.message_id) |
|
session.delete(m) |
|
session.commit() |
|
except MessageToDeleteNotFound: |
|
print("Сообщение не найдено!") |
|
session.delete(m) |
|
session.commit() |
|
await asyncio.sleep(600) |
|
|
|
|
|
async def on_startup(dispatcher: Dispatcher) -> None: |
|
asyncio.create_task(messages_cleanup()) |
|
|
|
|
|
if __name__ == "__main__": |
|
Base.metadata.create_all(engine) |
|
executor.start_polling(dp, skip_updates=True, on_startup=on_startup)
|
|
|