"""Модуль для конструирования и обработки запросов.""" import asyncio from typing import Dict, List, NewType import aiohttp from aiohttp import ClientResponse from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError from loguru import logger from app_config import AppConfig class RequestBulder: """Конструктор запросов. Attributes ---------- cfg : AppConfig Объект с конфигом для приложения. Methods ------- send_request(url: str, json_body: dict) Асинхронный метод для отправки запроса. Может принимать пустой json_body. """ def __init__(self, cfg: AppConfig) -> None: """Инициализация. Объект сесси `aiohttp.ClientSession` создается здесь. """ self.cfg = cfg _conn = aiohttp.TCPConnector(limit=cfg.requests_count) self.session = aiohttp.ClientSession(connector=_conn) async def __check_resp(self, resp: ClientResponse) -> bool: if not resp.status == 200: return False response = await resp.json() if not response["done"]: return False return True async def send_request(self, url: str, json_body: dict) -> dict: """Выполняет запрос, при успехе возвращает json с ответом, при неудаче возвращает пустой dict. Parameters ---------- url : str Адрес куда слать запрос. Нужна часть не включающая хост, только get/post. json_body: Json Тело хапроса в Json формате. Returns ------- dict Возвращает либо пустой dict, либо json ответа сервера (заполненный dict). Examples -------- >>> await send_request('count', json_body={} ) """ _url = f"{self.cfg.central_host_url}/{url}" try: async with self.session.get(_url, json=json_body) as raw_resp: if await self.__check_resp(raw_resp): json_resp: dict = await raw_resp.json() json_resp.pop("done") return json_resp except ClientConnectorError: logger.error(f"Ошибка подключения к серверу {_url}") # не придумал ничего умнее чем подождать frequency_sec из конфига await asyncio.sleep(self.cfg.frequency_sec) except ServerDisconnectedError: logger.error(f"Сервер отклонил подключение {_url}") return dict() async def wait(self) -> None: """Ждет frequency_sec время.""" await asyncio.sleep(self.cfg.frequency_sec) async def close(self) -> None: """Gracefull shutdown connection.""" await self.session.close()