import asyncio import requests import json from typing import List import os import configparser async def write_config_file( server_name: str, path_to_template_file: str, path_for_config: str ) -> None: template: str = "" with open(path_to_template_file, "r") as file: template = file.read() config_body: str = template.replace("server_name _;", f"server_name {server_name};") condifg_full_path: str = os.path.abspath(path_for_config) config_filename: str = f"{server_name}.conf" if not os.path.isdir(condifg_full_path): os.mkdir(condifg_full_path) with open(os.path.join(condifg_full_path, config_filename), "w") as file: file.write(config_body) async def send_request(server: str, columns: list, limit: int = 1) -> dict: response = requests.get(server, json={"columns": columns, "limit": limit}) return response.json() async def get_hosts(server_response: dict) -> List[str]: """Получить хосты из ответа сервера. Parameters ---------- server_response : dict """ hosts: list = [] for host in server_response.get("result"): hosts.append(host.get("hostname")) return hosts async def read_config(path_to_conf_file: str, section: str = "Main") -> dict: """ Считать конфиг с помощью `configparser`. Parameters ---------- path_to_conf_file: str Путь к файлу конфига. Можно указывать относительный. section: str Секция в конфиге, которую нужно считывать. По-умолчанию секция [Main]. Raises ------ KeyError Если в конфиге не найдено указанной секции. Returns ------- dict Словарь, из значений указанной секции. """ config = configparser.ConfigParser() config.read(os.path.abspath(path_to_conf_file)) if section not in config.sections(): raise KeyError(f"Section {section} not found in config file!") return dict(config.items(section)) async def main(): cnf = await read_config("service.conf") wait_sec: int = int(cnf["frequency_sec"]) while True: resp = await send_request( cnf["central_host_url"], columns=["hostname"], limit=9 ) hosts = await get_hosts(resp) for host in hosts: await write_config_file( server_name=host, path_to_template_file=cnf["template"], path_for_config=cnf["path_for_config"], ) await asyncio.sleep(wait_sec) if __name__ == "__main__": asyncio.run(main())