Бот для телеги, открывающий доступ в чатик по приглашениям.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

266 lines
8.5 KiB

import os
import logging
from aiogram import Bot, Dispatcher, executor, types
from aiogram.utils.exceptions import MessageToDeleteNotFound
import asyncio
from datetime import timedelta, datetime
import secrets
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import declarative_base, sessionmaker
logging.basicConfig(level=logging.INFO)
DELAY_TIME = 1
DELAY_TIME_Q = 3
CHAT_ID = -1001612776177
ALLOWED_USERS = [106693654, 7063133]
ADMINS = [106693654, 7063133, 4978608]
API_TOKEN = os.getenv("TELEGRAM_API_TOKEN")
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
engine = create_engine("sqlite:///support.db")
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = Session()
class Allowed_user(Base):
__tablename__ = "allowed_users"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
date_add = Column(DateTime, nullable=False)
class Token(Base):
"""Класс для токенов.
Хранит сам ключ, и дату истечения.
"""
__tablename__ = "tokens"
id = Column(Integer, primary_key=True)
token = Column(String, nullable=False)
expire = Column(DateTime, nullable=False)
def expired(self) -> bool:
"""Проверка срока жизни токена.
Вовзращает истину, если срок действия токена истек.
"""
if datetime.now() > self.expire:
return True
return False
def __repr__(self):
"""Базовое представление класса."""
return f"{self.id}: {self.token} | {self.expire}"
class Messages_to_delete(Base):
"""Сообщения, которые нужно будет удалить."""
__tablename__ = "messages_to_delete"
id = Column(Integer, primary_key=True)
message_id = Column(Integer, nullable=False)
deletion_date = Column(DateTime, nullable=False)
def deletion_needed(self) -> bool:
"""Нужно ли удалить это сообщение.
Мы могли пропустить удаление из-за перезапуска.
"""
if datetime.now() > self.deletion_date:
return True
return False
def auth(func):
async def wrapper(message):
users_list: Allowed_user = (
session.query(Allowed_user).order_by(Allowed_user.id).all()
)
finded_user: Allowed_user = next(
(u for u in users_list if u.user_id == message["from"]["id"]), None
)
if not finded_user:
return # await message.reply("Access Denied", reply=False)
return await func(message)
return wrapper
async def clean_up(message_id: int, delay_min: int):
exp = datetime.now() + timedelta(minutes=delay_min)
msg: Messages_to_delete = Messages_to_delete(
message_id=message_id, deletion_date=exp
)
session.add(msg)
session.commit()
await asyncio.sleep(delay_min * 60)
try:
await bot.delete_message(chat_id=CHAT_ID, message_id=message_id)
session.delete(msg)
session.commit()
except MessageToDeleteNotFound:
print("Ошибка удаления сообщения")
@dp.message_handler(commands=["start", "help"])
@auth
async def send_welcome(message: types.Message):
await message.reply(
"Бот для взаимодействия с группой\n\n" "/q - для отправки вопроса\n",
reply=False,
)
@dp.message_handler(commands=["mes"])
@auth
async def send_to_chanel(message: types.Message):
deletion_time = message.date + timedelta(minutes=DELAY_TIME)
out_text = f"{message.text[5:]} \n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=CHAT_ID,
text=out_text,
disable_notification=True,
parse_mode="Markdown",
)
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME)
@dp.message_handler(commands=["q"])
@auth
async def send_question(message: types.Message):
"""Отправить вопрос через бота.
Через бота можно отправить вопрос, который будет удален через
DELAY_TIME_Q минут.
"""
if len(message.text) == len("/q"):
await message.reply(
"Пишешь пусто - делаешь грустно (мне)"
"\nПосле /q надо написать вопрос, вот так:"
"\n\n/q как доехать до Баррикадной?",
reply=False,
)
return
deletion_time = message.date + timedelta(minutes=DELAY_TIME_Q)
out_text = f"*Внимание, вопрос!*\n\n{message.text[3:]}\n\nБудет удалено в *{deletion_time}*"
result = await bot.send_message(
chat_id=CHAT_ID,
text=out_text,
disable_notification=True,
parse_mode="Markdown",
)
await clean_up(message_id=result.message_id, delay_min=DELAY_TIME_Q)
@dp.message_handler(commands=["get_invite"])
async def get_invite(message: types.Message):
if message["from"]["id"] not in ADMINS:
return
token = secrets.token_urlsafe(16)
exp = datetime.now() + timedelta(minutes=5)
test = Token(token=token, expire=exp)
session.add(test)
session.commit()
await message.reply(
f"Новый токен: *{token}* \nБудет действителен 5 минут\n\nЧтобы вступить в чат напиши боту [@GorgorodBot](https://t.me/GorgorodBot)\n`/add_me {token}`",
reply=False,
parse_mode="Markdown",
)
@dp.message_handler(commands=["add_me"])
async def add_me(message: types.Message):
"""Добавление юзера в чат.
Когда бот получает токен, он проверяет его на валидность (есть ли
такой вообще, не истек ли) и высылает разовую ссылку на вступление
в группу.
"""
user_token = message.text[8:].strip()
tokens_list = session.query(Token).order_by(Token.id).all()
finded_token: Token = next((t for t in tokens_list if t.token == user_token), None)
if finded_token.expired():
print("Пизда токену")
# если с токеном все впорядке, то добавляем юзера в базу
if finded_token and not finded_token.expired():
new_user: Allowed_user = Allowed_user(
user_id=message["from"]["id"], date_add=datetime.now()
)
session.add(new_user)
session.commit()
# генерируем ссылку, истечет через 5 минут, может вступить 1 человек
link = await bot.create_chat_invite_link(
chat_id=CHAT_ID, expire_date=timedelta(minutes=5), member_limit=1
)
await message.reply(
"Welcome to the internet"
"\nHave a look around"
"\nAnything that brain of yours can think of can be found"
f"\n\nТвоя ссылка для вступления: {link['invite_link']}",
reply=False,
parse_mode="Markdown",
)
await asyncio.sleep(10)
with open("welcome_msg.txt", "r") as f:
msg = f.read()
await message.reply(msg, reply=False, parse_mode="Markdown")
@dp.message_handler(commands=["wlc"])
@auth
async def wlc(message: types.Message):
with open("welcome_msg.txt", "r") as f:
msg = f.read()
await message.reply(
msg, reply=False, parse_mode="Markdown", disable_web_page_preview=True
)
# https://stackoverflow.com/questions/67637631/create-a-background-process-using-aiogram
async def messages_cleanup() -> None:
msg_list = session.query(Messages_to_delete).order_by(Messages_to_delete.id).all()
while True:
for m in msg_list:
if m.deletion_needed:
try:
await bot.delete_message(chat_id=CHAT_ID, message_id=m.message_id)
except MessageToDeleteNotFound:
print("Ошибка удаления сообщения")
await asyncio.sleep(600)
async def on_startup(dispatcher: Dispatcher) -> None:
asyncio.create_task(messages_cleanup())
if __name__ == "__main__":
Base.metadata.create_all(engine)
executor.start_polling(dp, skip_updates=True, on_startup=on_startup)