5 changed files with 266 additions and 167 deletions
@ -0,0 +1,60 @@
|
||||
"""Загрузка настроек приложения. Используется библиотека `configparser`.""" |
||||
import configparser |
||||
from os import path |
||||
|
||||
|
||||
class AppConfig: |
||||
"""Класс для хранения настроек сервиса. |
||||
""" |
||||
|
||||
def __init__(self, path_to_conf_file: str, section: str = "Main"): |
||||
""" |
||||
Parameters |
||||
---------- |
||||
path_to_conf_file: str |
||||
Путь к файлу конфига. Можно указывать относительный. |
||||
|
||||
section: str |
||||
Секция в конфиге, которую нужно считывать. По-умолчанию секция `[Main]`. |
||||
|
||||
Raises |
||||
------ |
||||
KeyError |
||||
Если в конфиге не найдено указанной секции. |
||||
|
||||
|
||||
""" |
||||
cfg = configparser.ConfigParser() |
||||
|
||||
try: |
||||
cfg.read(path.abspath(path_to_conf_file)) |
||||
except FileNotFoundError: |
||||
print(f"File {path.abspath(path_to_conf_file)} not found!") |
||||
|
||||
if section not in cfg.sections(): |
||||
raise KeyError(f"Section {section} not found in config file!") |
||||
|
||||
conf = dict(cfg.items(section)) |
||||
|
||||
self.template: str = conf["template"] |
||||
self.path_for_config: str = conf["path_for_config"] |
||||
self.frequency_sec = int(conf["frequency_sec"]) |
||||
self.central_host_url: str = conf["central_host_url"] |
||||
self.requests_count: int = int(conf["requests_count"]) |
||||
self.request_portion: int = int(conf["request_portion"]) |
||||
|
||||
@property |
||||
def configs_path(self) -> str: |
||||
"""Возвращает абсолютный путь до папки с конфигами.""" |
||||
_path = path.abspath(self.path_for_config) |
||||
return _path |
||||
|
||||
def __repr__(self): |
||||
return ( |
||||
f"template = {self.template}\n" |
||||
f"path_for_config = {self.path_for_config}\n" |
||||
f"{self.frequency_sec}\n" |
||||
f"{self.central_host_url}\n" |
||||
f"{self.requests_count}\n" |
||||
f"{self.request_portion}\n" |
||||
) |
@ -0,0 +1,53 @@
|
||||
import asyncio |
||||
import os |
||||
import aiofiles |
||||
|
||||
|
||||
class ConfigObject: |
||||
def __init__(self, host: str, conf_body: str, path: str): |
||||
self.host = host |
||||
self.conf_body = conf_body.replace( |
||||
"server_name _;", f"server_name {host}.server.com;" |
||||
) |
||||
|
||||
self.path = os.path.abspath(path) |
||||
|
||||
if not os.path.isdir(self.path): |
||||
os.mkdir(self.path) |
||||
|
||||
self.full_path_to_file: str = os.path.join(self.path, f"{host}.conf") |
||||
|
||||
@property |
||||
def existst(self) -> bool: |
||||
"""Возвращает True, если файл конфига уже существует.""" |
||||
return os.path.isfile(self.full_path_to_file) |
||||
|
||||
async def write(self) -> bool: |
||||
"""Записывает конфиг файл.""" |
||||
if not self.existst: |
||||
async with aiofiles.open(self.full_path_to_file, mode="w") as file: |
||||
await file.write(self.conf_body) |
||||
return True |
||||
|
||||
def __repr__(self): |
||||
return f"Hi, config for {self.host}" |
||||
|
||||
|
||||
class ConfigFactory: |
||||
def __init__(self, path_to_template: str, path_to_configs_dir: str): |
||||
self.templ = self.__read_config_template_file(path_to_template) |
||||
self.path = path_to_configs_dir |
||||
|
||||
def create(self, host: str) -> ConfigObject: |
||||
"""Конструирует `ConfigObject` используя только хост.""" |
||||
return ConfigObject(host, self.templ, self.path) |
||||
|
||||
def __read_config_template_file(self, path_to_file: str) -> str: |
||||
"""Прочесть шаблон конфига для сервера из файла.""" |
||||
template: str = "" |
||||
|
||||
_full_path = os.path.abspath(path_to_file) |
||||
with open(_full_path, mode="r", encoding="utf8") as file: |
||||
template = file.read() |
||||
|
||||
return template |
@ -0,0 +1,86 @@
|
||||
"""Модуль для конструирования и обработки запросов.""" |
||||
import asyncio |
||||
from typing import Dict, List, NewType |
||||
|
||||
import aiohttp |
||||
from aiohttp import ClientResponse |
||||
from aiohttp.client_exceptions import ClientConnectorError, ServerDisconnectedError |
||||
from loguru import logger |
||||
|
||||
from app_config import AppConfig |
||||
|
||||
class RequestBulder: |
||||
"""Конструктор запросов. |
||||
|
||||
Attributes |
||||
---------- |
||||
cfg : AppConfig |
||||
Объект с конфигом для приложения. |
||||
|
||||
Methods |
||||
------- |
||||
send_request(url: str, json_body: dict) |
||||
Асинхронный метод для отправки запроса. Может принимать пустой json_body. |
||||
""" |
||||
def __init__(self, cfg: AppConfig) -> None: |
||||
"""Инициализация. Объект сесси `aiohttp.ClientSession` |
||||
создается здесь. """ |
||||
self.cfg = cfg |
||||
_conn = aiohttp.TCPConnector(limit=cfg.requests_count) |
||||
self.session = aiohttp.ClientSession(connector=_conn) |
||||
|
||||
async def __check_resp(self, resp: ClientResponse) -> bool: |
||||
if not resp.status == 200: |
||||
return False |
||||
|
||||
response = await resp.json() |
||||
|
||||
if not response["done"]: |
||||
return False |
||||
|
||||
return True |
||||
|
||||
async def send_request(self, url: str, json_body: dict) -> dict: |
||||
"""Выполняет запрос, при успехе возвращает json с ответом, при |
||||
неудаче возвращает пустой dict. |
||||
|
||||
Parameters |
||||
---------- |
||||
url : str |
||||
Адрес куда слать запрос. Нужна часть не включающая хост, только get/post. |
||||
|
||||
json_body: Json |
||||
Тело хапроса в Json формате. |
||||
|
||||
Returns |
||||
------- |
||||
dict |
||||
Возвращает либо пустой dict, либо json ответа сервера (заполненный dict). |
||||
|
||||
Examples |
||||
-------- |
||||
>>> await send_request('count', json_body={} ) |
||||
|
||||
""" |
||||
_url = f"{self.cfg.central_host_url}/{url}" |
||||
try: |
||||
async with self.session.get(_url, json=json_body) as raw_resp: |
||||
if await self.__check_resp(raw_resp): |
||||
json_resp: dict = await raw_resp.json() |
||||
json_resp.pop("done") |
||||
return json_resp |
||||
except ClientConnectorError: |
||||
logger.error(f"Ошибка подключения к серверу {_url}") |
||||
# не придумал ничего умнее чем подождать frequency_sec из конфига |
||||
await asyncio.sleep(self.cfg.frequency_sec) |
||||
except ServerDisconnectedError: |
||||
logger.error(f"Сервер отклонил подключение {_url}") |
||||
return dict() |
||||
|
||||
async def wait(self) -> None: |
||||
"""Ждет frequency_sec время.""" |
||||
await asyncio.sleep(self.cfg.frequency_sec) |
||||
|
||||
async def close(self) -> None: |
||||
"""Gracefull shutdown connection.""" |
||||
await self.session.close() |
@ -1,172 +1,82 @@
|
||||
from dataclasses import dataclass |
||||
from asyncio import Task |
||||
from typing import List |
||||
|
||||
"""Создание конфиг. файлов для хостов.""" |
||||
import argparse |
||||
import asyncio |
||||
import os |
||||
import configparser |
||||
import aiohttp |
||||
import aiofiles |
||||
|
||||
|
||||
@dataclass(frozen=True) |
||||
class Config: |
||||
"""Класс для хранения конфига сервиса.""" |
||||
|
||||
template: str |
||||
path_for_config: str |
||||
frequency_sec: int |
||||
central_host_url: str |
||||
requests_count: int |
||||
request_portion: int |
||||
|
||||
|
||||
async def write_config_files(hosts: list, cfg: Config, template: str) -> None: |
||||
"""Записываем конфиги для списка hosts.""" |
||||
|
||||
full_path_to_config_dir: str = os.path.abspath(cfg.path_for_config) |
||||
|
||||
if not os.path.isdir(full_path_to_config_dir): |
||||
os.mkdir(full_path_to_config_dir) |
||||
|
||||
for host in hosts: |
||||
config_filename: str = f"{host}.conf" |
||||
config_file = os.path.join(full_path_to_config_dir, config_filename) |
||||
|
||||
if not os.path.isfile(config_file): |
||||
config_body: str = template.replace( |
||||
"server_name _;", f"server_name {host}.server.com;" |
||||
) |
||||
async with aiofiles.open(config_file, mode="w") as file: |
||||
await file.write(config_body) |
||||
|
||||
|
||||
def _get_template(templ_file: str) -> str: |
||||
"""Возвращает шаблон конфига для хоста из файла `templ_file`.""" |
||||
|
||||
template: str = "" |
||||
with open(templ_file, mode="r", encoding="utf8") as file: |
||||
template = file.read() |
||||
|
||||
return template |
||||
|
||||
|
||||
async def get_records_count(session, server) -> int: |
||||
"""Возвращает количество записей в базе.""" |
||||
async with session.get(f"{server}/count") as resp: |
||||
resp = await resp.json() |
||||
count: int = int(resp["result"]) |
||||
return count |
||||
|
||||
|
||||
async def get_tasks( |
||||
url: str, portion: int, count: int, session: aiohttp.ClientSession, body: dict |
||||
) -> List[Task]: |
||||
"""Вернет список задач с запросами к API. |
||||
|
||||
Функция не ограничивает кол-во запросов, это нужно сделать до |
||||
вызова, чтобы передать корректный `session`. |
||||
|
||||
Parameters |
||||
---------- |
||||
url : str |
||||
Куда слать запрос. Ожидается "server/get" |
||||
portion : int |
||||
Сколько записей запрашивать за раз |
||||
count : int |
||||
Общее количество записей |
||||
session : aiohttp.ClientSession |
||||
Объект сессии, создается в уровне выше, с одним объектом |
||||
меньше накладных расходов на каждый запрос. |
||||
body : json |
||||
Json для запроса. |
||||
""" |
||||
tasks: List[Task] = [] |
||||
|
||||
for offset in range(0, count, portion): |
||||
tasks.append(asyncio.create_task(session.get(url, json=body))) |
||||
body["offset"] = offset |
||||
|
||||
return tasks |
||||
|
||||
from asyncio import Task |
||||
from dataclasses import dataclass |
||||
from typing import List, Union |
||||
|
||||
async def send_async_request(cfg: Config, json_body: dict) -> None: |
||||
"""Начать серию запросов.""" |
||||
template: str = _get_template(cfg.template) |
||||
import aiohttp |
||||
from aiohttp.client_exceptions import ClientConnectorError |
||||
from loguru import logger |
||||
|
||||
# ограничим одновременное число запросов |
||||
conn = aiohttp.TCPConnector(limit=cfg.requests_count) |
||||
from app_config import AppConfig |
||||
from config_object import ConfigFactory, ConfigObject |
||||
from request_builder import RequestBulder |
||||
|
||||
url = cfg.central_host_url |
||||
portion = cfg.request_portion |
||||
|
||||
try: |
||||
async with aiohttp.ClientSession(connector=conn) as session: |
||||
# получаем количесвто записей |
||||
count = await get_records_count(session, url) |
||||
async def get_records_count(rb: RequestBulder) -> int: |
||||
"""Обертка для получения количества записей с сервера.""" |
||||
resp: dict = await rb.send_request("count", json_body={}) |
||||
|
||||
tasks = await get_tasks(f"{url}/get", portion, count, session, json_body) |
||||
records_count: int = int(resp["result"]) if resp else 0 |
||||
|
||||
responses = await asyncio.gather(*tasks) |
||||
return int(records_count) |
||||
|
||||
# Пройдемся по ответам на запросы, запишем файлы конфига для |
||||
# каждого respone. Каждый response содержит portion или меньше хостов |
||||
for response in responses: |
||||
resp = await response.json() |
||||
hosts = [i["hostname"] for i in resp.get("result")] |
||||
await write_config_files(hosts, cfg, template) |
||||
except ClientConnectorError: |
||||
print(f"Невозможно подключиться к серверу {url}") |
||||
|
||||
async def custom_wrapper( |
||||
config_factory: ConfigFactory, |
||||
rb: RequestBulder, |
||||
usr: str, |
||||
json: dict, |
||||
) -> None: |
||||
"""Обертка для создания конфигов и их записи.""" |
||||
resp: dict = await rb.send_request("get", json) |
||||
|
||||
def read_config(path_to_conf_file: str, section: str = "Main") -> Config: |
||||
"""Считать конфиг с помощью `configparser`. |
||||
# Если мы получили валидный ответ, то разбираем пачку хостов из |
||||
# resp['result'] с созданием для каждой строки кофнига через |
||||
# фабрику |
||||
if resp: |
||||
conf_list = [config_factory.create(host["hostname"]) for host in resp["result"]] |
||||
await asyncio.gather(*[c.write() for c in conf_list]) |
||||
else: |
||||
logger.error(f"Сервер вернул ошибку") |
||||
|
||||
Parameters |
||||
---------- |
||||
path_to_conf_file: str |
||||
Путь к файлу конфига. Можно указывать относительный. |
||||
|
||||
section: str |
||||
Секция в конфиге, которую нужно считывать. По-умолчанию секция [Main]. |
||||
async def main(config_path: str) -> None: |
||||
"""Точка входа.""" |
||||
cfg: AppConfig = AppConfig(config_path) |
||||
|
||||
Raises |
||||
------ |
||||
KeyError |
||||
Если в конфиге не найдено указанной секции. |
||||
rb = RequestBulder(cfg) |
||||
|
||||
Returns |
||||
------- |
||||
dict |
||||
Словарь, из значений указанной секции. |
||||
""" |
||||
config = configparser.ConfigParser() |
||||
config.read(os.path.abspath(path_to_conf_file)) |
||||
|
||||
if section not in config.sections(): |
||||
raise KeyError(f"Section {section} not found in config file!") |
||||
|
||||
conf = dict(config.items(section)) |
||||
|
||||
return Config( |
||||
template=conf["template"], |
||||
path_for_config=conf["path_for_config"], |
||||
frequency_sec=int(conf["frequency_sec"]), |
||||
central_host_url=conf["central_host_url"], |
||||
requests_count=int(conf["requests_count"]), |
||||
request_portion=int(conf["request_portion"]), |
||||
while True: |
||||
records_count = await get_records_count(rb) |
||||
|
||||
json_boby_list = [] |
||||
for offset in range(0, records_count, cfg.request_portion): |
||||
body_json = { |
||||
"columns": "['hostname']", |
||||
"limit": cfg.request_portion, |
||||
"offset": offset, |
||||
} |
||||
json_boby_list.append(body_json) |
||||
|
||||
conf_factory = ConfigFactory(cfg.template, cfg.path_for_config) |
||||
await asyncio.gather( |
||||
*[custom_wrapper(conf_factory, rb, "get", jb) for jb in json_boby_list] |
||||
) |
||||
|
||||
|
||||
async def main() -> None: |
||||
"""Точка входа.""" |
||||
cfg: Config = read_config("service.conf") |
||||
|
||||
while True: |
||||
body = {"columns": "['hostname']", "limit": cfg.request_portion, "offset": 0} |
||||
await send_async_request(cfg, json_body=body) |
||||
await asyncio.sleep(cfg.frequency_sec) |
||||
await rb.wait() |
||||
# закрываем сессию |
||||
await rb.close() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
asyncio.run(main()) |
||||
parser = argparse.ArgumentParser(description="Service for nginx config creation.") |
||||
parser.add_argument( |
||||
"--config_path", required=True, type=str, help="path for conifg file" |
||||
) |
||||
args = parser.parse_args() |
||||
config_path = os.path.abspath(args.config_path) |
||||
|
||||
asyncio.run(main(config_path)) |
||||
|
Loading…
Reference in new issue