Бот для телеги, открывающий доступ в чатик по приглашениям.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

227 lines
7.5 KiB

import os
import logging
from aiogram import Bot, Dispatcher, executor, types
from aiogram.utils.exceptions import MessageToDeleteNotFound
import asyncio
from datetime import timedelta, datetime
import secrets
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError
from sqlalchemy_utils import database_exists, create_database
from models import Allowed_user, Token, Messages_to_delete, Base
logging.basicConfig(level=logging.INFO)
DELAY_TIME = int(os.getenv("GBOT_DELAY_TIME"))
DELAY_TIME_Q = int(os.getenv("GBOT_DELAY_TIME_Q"))
CHAT_ID = os.getenv("GBOT_CHAT_ID")
ALLOWED_USERS = [106693654, 7063133]
ADMINS = [106693654, 7063133, 4978608]
API_TOKEN = os.getenv("TELEGRAM_API_TOKEN")
DB_PATH = os.getenv("GBOT_DB_PATH")
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
engine = create_engine("sqlite:///" + str(DB_PATH))
# if not database_exists(engine.url):
# create_database(engine.url)
Session = sessionmaker(bind=engine)
session = Session()
def auth(func):
async def wrapper(message):
users_list: Allowed_user = (
session.query(Allowed_user).order_by(Allowed_user.id).all()
)
finded_user: Allowed_user = next(
(u for u in users_list if u.user_id == message["from"]["id"]), None
)
if not finded_user:
return
return await func(message)
return wrapper
async def clean_up(message_id: int, delay_min: int):
exp = datetime.now() + timedelta(minutes=delay_min)
msg: Messages_to_delete = Messages_to_delete(
message_id=message_id, deletion_date=exp
)
session.add(msg)
session.commit()
await asyncio.sleep(delay_min * 60)
try:
await bot.delete_message(chat_id=CHAT_ID, message_id=message_id)
session.delete(msg)
session.commit()
except MessageToDeleteNotFound:
print("Ошибка удаления сообщения")
@dp.message_handler(commands=["start", "help"])
@auth
async def send_welcome(message: types.Message):
await message.reply(
"Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n",
reply=False,
)
@dp.message_handler(commands=["mes"])
@auth
async def send_to_chanel(message: types.Message):
deletion_time = message.date + timedelta(minutes=DELAY_TIME)
out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=CHAT_ID,
text=out_text,
disable_notification=True,
parse_mode="Markdown",
)
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME)
@dp.message_handler(commands=["q"])
@auth
async def send_question(message: types.Message):
"""Отправить вопрос через бота.
Через бота можно отправить вопрос, который будет удален через
DELAY_TIME_Q минут.
"""
if len(message.text) == len("/q"):
await message.reply(
"Пишешь пусто - делаешь грустно (мне)"
"\nПосле /q надо написать вопрос, вот так:"
"\n\n/q как доехать до Баррикадной?",
reply=False,
)
return
deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q)
out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=CHAT_ID,
text=out_text,
disable_notification=True,
parse_mode="Markdown",
)
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q)
@dp.message_handler(commands=["get_invite"])
async def get_invite(message: types.Message):
if message["from"]["id"] not in ADMINS:
return
token = secrets.token_urlsafe(16)
exp = datetime.now() + timedelta(minutes=5)
test = Token(token=token, expire=exp)
session.add(test)
session.commit()
await message.reply(
f"Новый токен: *{token}* \nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту [@GorgorodBot](https://t.me/GorgorodBot)\n`/add_me {token}`",
reply=False,
parse_mode="Markdown",
)
@dp.message_handler(commands=["add_me"])
async def add_me(message: types.Message):
"""Добавление юзера в чат.
Когда бот получает токен, он проверяет его на валидность (есть ли
такой вообще, не истек ли) и высылает разовую ссылку на вступление
в группу.
"""
user_token = message.text[8:].strip()
tokens_list = session.query(Token).order_by(Token.id).all()
finded_token: Token = next((t for t in tokens_list if t.token == user_token), None)
if finded_token.expired():
print("Пизда токену")
# если с токеном все впорядке, то добавляем юзера в базу
if finded_token and not finded_token.expired():
new_user: Allowed_user = Allowed_user(
user_id=message["from"]["id"], date_add=datetime.now()
)
try:
session.add(new_user)
session.delete(finded_token)
session.commit()
except IntegrityError:
print(f"Юзер {new_user.user_id} уже в базе")
session.rollback()
# генерируем ссылку, истечет через 5 минут, может вступить 1 человек
link = await bot.create_chat_invite_link(
chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1
)
await message.reply(
"Welcome to the internet"
"\nHave a look around"
"\nAnything that brain of yours can think of can be found"
f"\n\nТвоя ссылка для вступления: {link['invite_link']}",
reply=False,
parse_mode="Markdown",
)
await asyncio.sleep(10)
with open("static/welcome_msg.txt", "r") as f:
msg = f.read()
await message.reply(msg, reply=False, parse_mode="Markdown")
@dp.message_handler(commands=["wlc"])
@auth
async def wlc(message: types.Message):
with open("welcome_msg.txt", "r") as f:
msg = f.read()
await message.reply(
msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True
)
# https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram
async def messages_cleanup() -> None:
msg_list = session.query(Messages_to_delete).order_by(Messages_to_delete.id).all()
while True:
for m in msg_list:
if m.deletion_needed:
try:
await bot.delete_message(chat_id=CHAT_ID, message_id=m.message_id)
session.delete(m)
session.commit()
except MessageToDeleteNotFound:
print("Сообщение не найдено!")
session.delete(m)
session.commit()
await asyncio.sleep(600)
async def on_startup(dispatcher: Dispatcher) -> None:
asyncio.create_task(messages_cleanup())
if __name__ == "__main__":
Base.metadata.create_all(engine)
executor.start_polling(dp, skip_updates=True, on_startup=on_startup)