Бот для телеги, открывающий доступ в чатик по приглашениям.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
9.1 KiB

import os
import logging
from aiogram import Bot, Dispatcher, executor, types
from aiogram.utils.exceptions import MessageToDeleteNotFound
import asyncio
from datetime import timedelta, datetime
import secrets
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
from models import Allowed_user, Token, Messages_to_delete, Base
from aiogram.utils.markdown import escape_md
logging.basicConfig(level=logging.INFO)
DELAY_TIME = int(os.getenv("GBOT_DELAY_TIME"))
DELAY_TIME_Q = int(os.getenv("GBOT_DELAY_TIME_Q"))
CHAT_ID = os.getenv("GBOT_CHAT_ID")
ALLOWED_USERS = [106693654, 7063133]
ADMINS = [106693654, 7063133, 4978608]
API_TOKEN = os.getenv("TELEGRAM_API_TOKEN")
DB_PATH = os.getenv("GBOT_DB_PATH")
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
engine = create_engine("sqlite:///" + str(DB_PATH))
Session = sessionmaker(bind=engine)
session = Session()
def auth(func):
"""Аутентификация.
Проверяем, что у юзера есть полномочия на выполнение действия - он
должен быть в белом списке.
"""
async def wrapper(message):
finded_user: Allowed_user
uid = message["from"]["id"]
finded_user = (
session.query(Allowed_user)
.filter(Allowed_user.user_id == uid)
.one_or_none()
)
if not finded_user:
return
return await func(message)
return wrapper
def only_admins(func):
"""Действия только для админов."""
async def wrapper(message):
uid = message["from"]["id"]
if uid not in ADMINS:
return
return await func(message)
return wrapper
async def clean_up(message_id: int, delay_min: int):
exp = datetime.now() + timedelta(minutes=delay_min)
msg: Messages_to_delete = Messages_to_delete(
message_id=message_id, deletion_date=exp
)
session.add(msg)
session.commit()
await asyncio.sleep(delay_min * 60)
try:
await bot.delete_message(chat_id=CHAT_ID, message_id=message_id)
session.delete(msg)
session.commit()
except MessageToDeleteNotFound:
print("Ошибка удаления сообщения")
@dp.message_handler(commands=["start", "help"])
@auth
async def send_welcome(message: types.Message):
await message.reply(
"Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n",
reply=False,
)
@dp.message_handler(commands=["channel_id"])
async def get_channel_id(message: types.Message) -> None:
"""Возвращает id канала по его имени"""
if message["from"]["id"] not in ADMINS:
return
c_id = message.text[12:]
result = await bot.send_message(
chat_id=c_id,
text="ping",
disable_notification=True,
)
await message.reply(
f"Ид чата {c_id}: *{result.sender_chat.id}*", parse_mode="Markdown"
)
@dp.message_handler(commands=["mes"])
@auth
async def send_to_chanel(message: types.Message):
deletion_time = message.date + timedelta(minutes=DELAY_TIME)
out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=CHAT_ID,
text=out_text,
disable_notification=True,
parse_mode="Markdown",
)
print(result.sender_chat.id)
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME)
@dp.message_handler(commands=["q"])
@auth
async def send_question(message: types.Message):
"""Отправить вопрос через бота.
Через бота можно отправить вопрос, который будет удален через
DELAY_TIME_Q минут.
"""
if len(message.text) == len("/q"):
await message.reply(
"Пишешь пусто - делаешь грустно (мне)"
"\nПосле /q надо написать вопрос, вот так:"
"\n\n/q как доехать до Баррикадной?",
reply=False,
)
return
deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q)
out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=CHAT_ID,
text=out_text,
disable_notification=True,
parse_mode="Markdown",
)
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q)
@dp.message_handler(commands=["get_invite"])
@only_admins
async def get_invite(message: types.Message) -> None:
token = secrets.token_urlsafe(16)
bot_info = await bot.get_me()
bot_link = "@" + str(bot_info.username)
exp = datetime.now() + timedelta(minutes=5)
test = Token(token=token, expire=exp)
session.add(test)
session.commit()
msg = f"Новый токен: *{token}*\nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту {escape_md(bot_link)} \n`/add_me {token}`"
await message.reply(
msg,
reply=False,
parse_mode="Markdown",
)
@dp.message_handler(commands=["add_me"])
async def add_me(message: types.Message) -> None:
"""Добавление юзера в чат.
Когда бот получает токен, он проверяет его на валидность (есть ли
такой вообще, не истек ли) и высылает разовую ссылку на вступление
в группу.
"""
user_token = message.text[8:].strip()
tokens_list = session.query(Token).order_by(Token.id).all()
finded_token: Token = next((t for t in tokens_list if t.token == user_token), None)
# если с токеном все впорядке, то добавляем юзера в базу
if finded_token and not finded_token.expired():
new_user: Allowed_user = Allowed_user(
user_id=message["from"]["id"], date_add=datetime.now()
)
try:
session.add(new_user)
session.delete(finded_token)
session.commit()
except IntegrityError:
print(f"Юзер {new_user.user_id} уже в базе")
session.rollback()
# генерируем ссылку, истечет через 5 минут, может вступить 1 человек
link = await bot.create_chat_invite_link(
chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1
)
await message.reply(
"Welcome to the internet"
"\nHave a look around"
"\nAnything that brain of yours can think of can be found"
f"\n\nТвоя ссылка для вступления: {link['invite_link']}",
reply=False,
parse_mode="Markdown",
)
await asyncio.sleep(10)
with open("static/welcome_msg.txt", "r") as f:
msg = f.read()
await message.reply(msg, reply=False, parse_mode="Markdown")
@dp.message_handler(commands=["stop"])
@auth
async def stop_and_panic(message: types.Message) -> None:
"""Panic-button для бота.
При нажатии:
- кидает юзера в чс чата и удаляет его,
- удаляет из белого списка.
"""
u_id = message["from"]["id"]
finded_user: Allowed_user = (
session.query(Allowed_user).filter(Allowed_user.user_id == u_id).first()
)
await bot.kick_chat_member(
chat_id=CHAT_ID, user_id=finded_user.user_id, revoke_messages=True
)
session.delete(finded_user)
session.commit()
# рассылка сообщения админам что юзер нажал паническую кнопку. Нужны ид чатов.
# for admin_id in ADMINS:
# await bot.send_meassage()
@dp.message_handler(commands=["wlc"])
@auth
async def wlc(message: types.Message):
with open("welcome_msg.txt", "r") as f:
msg = f.read()
await message.reply(
msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True
)
# https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram
async def messages_cleanup() -> None:
msg_list = session.query(Messages_to_delete).order_by(Messages_to_delete.id).all()
while True:
for m in msg_list:
if m.deletion_needed:
try:
await bot.delete_message(chat_id=CHAT_ID, message_id=m.message_id)
session.delete(m)
session.commit()
except MessageToDeleteNotFound:
print("Сообщение не найдено!")
session.delete(m)
session.commit()
await asyncio.sleep(600)
async def on_startup(dispatcher: Dispatcher) -> None:
asyncio.create_task(messages_cleanup())
if __name__ == "__main__":
Base.metadata.create_all(engine)
executor.start_polling(dp, skip_updates=True, on_startup=on_startup)