You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

155 lines
4.8 KiB

from dataclasses import dataclass
from asyncio import Task
from typing import List
import asyncio
import os
import configparser
import aiohttp
import aiofiles
@dataclass(frozen=True)
class Config:
"""Класс для хранения конфига сервиса."""
template: str
path_for_config: str
frequency_sec: int
central_host_url: str
requests_count: int
request_portion: int
async def write_config_files(hosts: list, cfg: Config, template: str) -> None:
"""Записываем конфиги для списка hosts."""
full_path_to_config_dir: str = os.path.abspath(cfg.path_for_config)
if not os.path.isdir(full_path_to_config_dir):
os.mkdir(full_path_to_config_dir)
for host in hosts:
config_filename: str = f"{host}.conf"
config_file = os.path.join(full_path_to_config_dir, config_filename)
if not os.path.isfile(config_file):
config_body: str = template.replace(
"server_name _;", f"server_name {host}.server.com;"
)
async with aiofiles.open(config_file, mode="w") as file:
await file.write(config_body)
def _get_template(templ_file: str) -> str:
"""Возвращает шаблон конфига для хоста из файла `templ_file`."""
template: str = ""
with open(templ_file, mode="r", encoding="utf8") as file:
template = file.read()
return template
async def get_records_count(session, server) -> int:
"""Возвращает количество записей в базе."""
async with session.get(f"{server}/count") as resp:
resp = await resp.json()
count: int = int(resp["result"])
return count
async def get_tasks(cfg: Config, session: aiohttp.ClientSession, count, body) -> list:
"""Вернет список запросов к API.
Функция не ограничивает кол-во запросов, это нужно сделать до
вызова, чтобы передать корректный `session`.
"""
tasks: List[Task] = []
offset = 0
url = f"{cfg.central_host_url}/get"
for _ in range(count // cfg.request_portion + 1):
if offset < count:
print(f"{offset=}")
tasks.append(asyncio.create_task(session.get(url, json=body)))
offset += cfg.request_portion
body["offset"] = offset
print(f"{count=}")
return tasks
async def send_async_request(cfg: Config, json_body: dict) -> None:
"""Начать серию запросов."""
template: str = _get_template(cfg.template)
# ограничим одновременное число запросов
conn = aiohttp.TCPConnector(limit=cfg.requests_count)
async with aiohttp.ClientSession(connector=conn) as session:
# всего записей в базе
count = await get_records_count(session, cfg.central_host_url)
tasks = await get_tasks(cfg, session, count, json_body)
responses = await asyncio.gather(*tasks)
for response in responses:
resp = await response.json()
hosts = [i["hostname"] for i in resp.get("result")]
await write_config_files(hosts, cfg, template)
def read_config(path_to_conf_file: str, section: str = "Main") -> Config:
"""
Считать конфиг с помощью `configparser`.
Parameters
----------
path_to_conf_file: str
Путь к файлу конфига. Можно указывать относительный.
section: str
Секция в конфиге, которую нужно считывать. По-умолчанию секция [Main].
Raises
------
KeyError
Если в конфиге не найдено указанной секции.
Returns
-------
dict
Словарь, из значений указанной секции.
"""
config = configparser.ConfigParser()
config.read(os.path.abspath(path_to_conf_file))
if section not in config.sections():
raise KeyError(f"Section {section} not found in config file!")
conf = dict(config.items(section))
return Config(
template=conf["template"],
path_for_config=conf["path_for_config"],
frequency_sec=int(conf["frequency_sec"]),
central_host_url=conf["central_host_url"],
requests_count=int(conf["requests_count"]),
request_portion=int(conf["request_portion"]),
)
async def main() -> None:
"""Точка входа."""
cfg: Config = read_config("service.conf")
while True:
body = {"columns": "['hostname']", "limit": cfg.request_portion, "offset": 0}
await send_async_request(cfg, json_body=body)
await asyncio.sleep(cfg.frequency_sec)
if __name__ == "__main__":
asyncio.run(main())