Архитектура AI-лаборатории: Docker Swarm, FastAPI и 27 агентов — путь от одного скрипта
Кровь, Docker и 3 ночи без сна: как лаборатория врача стала AI-фабрикой
Этот скрипт начался с боли в спине. В 3:17 ночи, после двенадцатого часа разбора бумажных журналов регистрации анализов. 514 проб за день, 27 ручных записей в Excel, 8 звонков с криками «где мои результаты?!». Я — заведующий лабораторией, человек, который должен всё это контролировать. Но контроль превратился в рутину, которая съедает всё время.
Мой первый «агент» был не ИИ, а тупая строчка в Python. Она просто парсила CSV-файл выгрузки из нашего лабораторного анализатора и пыталась сопоставить номер пробы с фамилией из другой таблицы. Версия? Python 3.9.5. Пакеты? pandas и openpyxl.
# agent_zero.py — тот самый первый скрипт, который сломался на второй день
import pandas as pd
import openpyxl
def match_samples(csv_path, excel_path):
# Читаем данные из анализатора (CSV)
df_lab = pd.read_csv(csv_path, encoding='windows-1251') # потому что наш древний анализатор так выдает
# Читаем журнал регистрации (Excel)
df_reg = pd.read_excel(excel_path, sheet_name='Журнал')
# Сопоставляем по номеру пробы
merged = pd.merge(df_lab, df_reg, on='Номер пробы', how='left')
# Сохраняем результат
merged.to_excel('result.xlsx', index=False)
if __name__ == "__main__":
match_samples('data/2023-10-01_analyses.csv', 'data/registration.xlsx')
Он сломался на вторые сутки. Причина? В журнал регистрации кто-то вручную записал пробу №0157 как 157. Скрипт не нашёл соответствия, и 12 результатов ушли в никуда. Первый звонок от хирурга в 8 утра: «Почему нет данных по моему пациенту?». Это был момент, когда я понял — if-else и простые сопоставления не работают в хаосе реальной лаборатории. Нужно что-то, что может думать, предполагать, исправлять опечатки.
Так родилась идея первого настоящего агента — «Санитар». Его задача была не просто сопоставить, а найти возможные соответствия даже при ошибках в данных. Я пересел с pandas на rapidfuzz для нечёткого сравнения строк. Версия rapidfuzz==2.13.7.
# agent_sanitizer.py — первая попытка внедрить «интеллект»
from rapidfuzz import fuzz, process
import pandas as pd
def fuzzy_match_samples(df_lab, df_reg):
results = []
for idx, lab_row in df_lab.iterrows():
sample_id_lab = str(lab_row['Номер пробы']).strip()
# Получаем список всех номеров пробы из журнала
reg_ids = [str(id).strip() for id in df_reg['Номер пробы'].tolist()]
# Нечёткое сравнение: находим лучшее соответствие
best_match, score, index = process.extractOne(sample_id_lab, reg_ids, scorer=fuzz.ratio)
if score >= 85: # Если совпадение больше 85% — считаем, что это та же проба
matched_row = df_reg.iloc[index]
# Объединяем данные
merged_row = {**lab_row.to_dict(), **matched_row.to_dict()}
results.append(merged_row)
else:
# Если соответствие не найдено — помечаем для ручной проверки
lab_row['Статус'] = 'НЕ СОПОСТАВЛЕН'
results.append(lab_row.to_dict())
return pd.DataFrame(results)
Это уже работало лучше. 514 проб обрабатывались, 15 из них попадали в «НЕ СОПОСТАВЛЕН» из-за грубых ошибок в журнале, но остальные находили своего пациента. Время ручной проверки сократилось с 3 часов до 30 минут. Но скрипт жил на моём рабочем компьютере. Чтобы запустить его, мне нужно было открыть терминал, скопировать файлы в правильные папки, запустить команду. Это не было системой.
И тут появился Docker. Я не был экспертом. Моя первая docker-compose.yml выглядела как крик души.
# docker-compose.yml — версия 1.0, которая упала при первом запуске
version: '3.8'
services:
agent-sanitizer:
build: .
volumes:
- ./data:/app/data # Плохая идея, но я тогда не знал
- ./output:/app/output
command: python agent_sanitizer.py
Я запустил docker-compose up и получил ошибку: ModuleNotFoundError: No module named 'rapidfuzz'. Потому что забыл добавить его в requirements.txt. Первый урок: Docker не магия, он просто запускает то, что ты ему дал. Вторая ночь без сна ушла на чтение документации, настройку Dockerfile и борьбу с правами доступа к файлам на хосте.
Но когда он заработал, это было как первый анализ крови под микроскопом — волшебство. Команда docker-compose up -d запускала агента, который ждал новых файлов в папке data/incoming. Я мог поставить это на старый сервер в лаборатории и забыть. Но я не забыл. Я увидел потенциал: один агент решает одну проблему. А у нас проблем десятки: распределение проб по отделам, контроль сроков выполнения, проверка критических значений, генерация отчётов для врачей, предупреждение о недостатке реактивов…
Так лаборатория врача стала фабрикой по производству агентов. Не AI в академическом смысле, а маленьких, умных скриптов, которые берут на себя кусочки моей ежедневной боли. Каждый новый агент — это ответ на новый крик, на новую проблему, которая раньше заставляла меня листать бумаги в три часа ночи.
И началось.
Одна строчка кода и бардак на 500 пробирок в день
Вот она. Точка невозврата. Тот момент, когда ты думаешь: «Попробую одну строчку кода, просто посмотреть». А получается бардак на 500 пробирок в день и пожар в лог-файлах.
Я начал не с агентов, не с нейронок. Я начал с боли. В 3 часа ночи, после очередной дозы Excel-ада, я открыл VS Code и написал первую строчку. Она была простая, почти детская:
# initial_hell.py
import pandas as pd
# Читаем файл регистрации (CSV, который медсестра заполняет руками)
df = pd.read_csv('registration.csv', encoding='windows-1251')
print(df.head())
Выполнил. Получил ошибку:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte
Пять минут гугла. Добавил параметр encoding='windows-1251'. Выполнил. Увидел:
Номер пробы Фамилия Имя Дата рождения Назначенный анализ
0 12345 Иванов И.И. 01.01.1980 Глюкоза
И это было моё первое поражение. Данные были не просто кривые. Они были биологически кривые. «Иванов И.И.» — это не имя, это рудимент советской медицины. «01.01.1980» — это не дата, это строка. «Глюкоза» — это не единственный анализ, в одной пробирке могло быть 15 разных тестов, записанных через запятую, иногда с точкой, иногда с двоеточием.
Первая попытка автоматизации привела к первому бардаку. Я решил «просто» сопоставить эти пробы с результатами из аналитических систем (у нас три разных аппарата, каждый выгружает свой формат CSV). Написал скрипт сопоставления:
# naive_matcher.py
import pandas as pd
registration_df = pd.read_csv('registration.csv', encoding='windows-1251')
results_df = pd.read_csv('results_biochemistry.csv')
# Пытаемся сопоставить по номеру пробы
merged_df = pd.merge(registration_df, results_df, on='Номер пробы', how='left')
print(f"Сопоставлено: {merged_df['Результат'].notna().sum()} из {len(merged_df)}")
Вывод был убийственным:
Сопоставлено: ••• 152 из 514
Почему? Потому что в регистрации номер пробы был строкой «12345», а в результатах от аппарата он был целым числом 12345. Потому что аппарат иногда выдавал номер 012345. Потому что медсестра могла написать «12345А» (если проба повторная). Итог: 362 пробы остались без результата. Это 362 звонка от врачей, 362 крика «где анализ?!».
Я попробовал «насильственно» привести к одному формату, добавив очистку:
def clean_sample_id(raw_id):
try:
# Убираем буквы, лишние пробелы
cleaned = str(raw_id).strip().upper()
if cleaned.endswith('А'):
cleaned = cleaned.replace('А', '')
return int(cleaned)
except:
return None
Это добавило ещё 100 сопоставленных проб. Но породило новую проблему: теперь пробы «12345» и «12345А» (первичная и повторная от одного пациента) стали одним номером и слились в одну запись, потеряв повторный результат.
Цифры бардака за первый день автоматизации: - 514 пробирок в исходном файле. - 152 сопоставлены автоматически (наивный merge). - +100 сопоставлены после агрессивной очистки. - 262 пробы остались «потерянными». - 3 пробы были некорректно объединены (потеря данных). - Время обработки: 40 секунд на скрипт + 2 часа моей ручной проверки и исправлений Excel.
Я понял, что «одна строчка» превратилась в лавину проблем. Каждая попытка исправить одно — создавала два новых бага. Это был классический лабораторный синдром: лечение одного показателя нарушает баланс всех остальных.
Но тут появилась идея. Нельзя сопоставить данные по одному жесткому правилу. Нужно пробовать несколько стратегий, оценивать их надежность и выбирать лучшую для каждой конкретной пробы. Это был момент, когда в голове щёлкнуло: «Это не скрипт. Это должен быть агент».
Пока что агентов нет. Но есть первый черновик алгоритма, который я назвал «Сопоставитель-кандидат»:
# candidate_matcher_v1.py
import pandas as pd
import re
def match_candidates(reg_row, results_df):
candidates = []
sample_id_raw = reg_row['Номер пробы']
# Стратегия 1: Чистый номер (int)
try:
clean_id = int(re.sub(r'[^0-9]', '', str(sample_id_raw)))
match = results_df[results_df['SampleID'] == clean_id]
if len(match) == 1:
candidates.append({'strategy': 'clean_int', 'data': match.iloc[0], 'confidence': 0.9})
except:
pass
# Стратегия 2: Поиск по части строки (если есть буква)
if any(char.isalpha() for char in str(sample_id_raw)):
for _, result_row in results_df.iterrows():
if str(sample_id_raw) in str(result_row['SampleID']) or str(result_row['SampleID']) in str(sample_id_raw):
candidates.append({'strategy': 'substring', 'data': result_row, 'confidence': 0.6})
# Стратегия 3: По имени и дате (если аппарат их передал)
# ... (этот код был длинным и жалким)
return candidates
# Применяем к каждой пробе
registration_df = pd.read_csv('registration.csv', encoding='windows-1251')
results_df = pd.read_csv('results_biochemistry.csv')
matched_count = 0
for index, row in registration_df.iterrows():
candidates = match_candidates(row, results_df)
if candidates:
# Выбираем кандидата с максимальной confidence
best_match = max(candidates, key=lambda x: x['confidence'])
# ... записываем результат
matched_count += 1
print(f"Сопоставлено кандидатами: {matched_count} из {len(registration_df)}")
Этот скрипт выдал Сопоставлено кандидатами: 410 из 514. Прогресс! Но он работал 12 минут на моём ночном рабочем компьютере. И он был монолитом. Если одна стратегия ломалась — падал весь процесс.
Именно тогда я увидел дорогу. Этот «кандидат» должен стать независимым микросервисом. Каждая стратегия сопоставления — отдельным маленьким агентом. А над ними должен быть главный агент-арбитр, который выбирает лучший результат.
Но это уже был следующий этап. На этом этапе я получил главный урок: автоматизация начинается не с решения проблемы, а с её детального обнажения. Одна строчка кода показала мне весь хаос, который мы годами поддерживали ручными обработками. Бардак на 500 пробирок стал не препятствием, а картой для построения новой системы.
От if-else адского кода к первому рабочему агенту-санитару
Вот что у меня было после «точки невозврата». Скрипт-паук, который ползал по папкам и пытался что-то найти. Он работал. Но его логика была ужаснее, чем мои бумажные журналы.
# agent_v0.1_hell.py - НЕ ДЕЛАЙТЕ ТАК
import os
import shutil
from datetime import datetime
def process_folder(base_path):
for root, dirs, files in os.walk(base_path):
for file in files:
if file.endswith('.pdf'):
# Это PDF-заявка от отделения?
if 'заявка' in file.lower():
if 'хирургия' in root.lower():
dest = '/archive/хирургия/заявки/'
elif 'терапия' in root.lower():
dest = '/archive/терапия/заявки/'
else:
dest = '/archive/разное/'
# А если файл старше 30 дней?
filepath = os.path.join(root, file)
file_time = os.path.getmtime(filepath)
if datetime.now().timestamp() - file_time > 2592000:
dest = dest.replace('заявки', 'старые_заявки')
# Но если это срочный анализ?
if 'cito' in file.lower():
dest = '/priority/cito/' + dest
# А может, это результат анализа?
elif 'результат' in file.lower():
# Проверить, есть ли ID пациента в имени?
# ... и так ещё 200 строк if-else
Это был монстр. 450 строк сплошных if, elif, вложенных условий. Он падал, если в имени файла была кириллическая «с» вместо английской «c». Он дублировал файлы. Однажды он отправил результаты кардиологии в папку «кардиология» (с опечаткой) и я полдня искал их.
Боль была в том, что любое изменение — новый тип анализа, новое отделение — требовало ковыряния в этом спагетти-коде. Я добавлял условие, ломал два других. Это был хрупкий карточный домик.
Первое прозрение: агент — это не скрипт, это состояние + поведение.
Я перечитал пару статей про агентные системы и понял свою ошибку. Мой «агент» был статичной функцией. Настоящий агент должен: 1. Знать своё состояние (что он уже сделал, что видит вокруг). 2. Иметь память (хотя бы контекст последних действий). 3. Принимать решения на основе этого, а не жёстких правил.
Версия 0.2 родилась в 5 утра. Я выпил третий кофе и сел переписывать.
# agent_v0.2_skeleton.py - уже теплее
import os
import json
from datetime import datetime
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
@dataclass
class AgentState:
name: str
current_task: Optional[str] = None
processed_items: int = 0
last_error: Optional[str] = None
context: Dict[str, Any] = None
def to_dict(self):
return asdict(self)
def update_context(self, key: str, value: Any):
if self.context is None:
self.context = {}
self.context[key] = value
class BaseAgent:
def __init__(self, name: str, work_dir: str):
self.state = AgentState(name=name)
self.work_dir = work_dir
self._load_state() # Пытаемся восстановиться после падения
def _load_state(self):
state_file = os.path.join(self.work_dir, f"{self.state.name}_state.json")
if os.path.exists(state_file):
with open(state_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.state = AgentState(**data)
print(f"[{self.state.name}] State loaded. Processed: {self.state.processed_items}")
def _save_state(self):
state_file = os.path.join(self.work_dir, f"{self.state.name}_state.json")
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(self.state.to_dict(), f, ensure_ascii=False, indent=2)
def run(self):
"""Основной цикл агента. Шаблонный метод."""
try:
self._pre_run()
self._process()
self._post_run()
except Exception as e:
self.state.last_error = str(e)
self._save_state()
raise
def _pre_run(self):
self.state.current_task = "starting"
print(f"[{self.state.name}] Starting at {datetime.now()}")
def _process(self):
"""Эту логику будут переопределять наследники."""
raise NotImplementedError
def _post_run(self):
self.state.current_task = None
self.state.last_error = None
self._save_state()
print(f"[{self.state.name}] Finished. Total processed: {self.state.processed_items}")
Это был каркас. Агент теперь умел сохранять своё состояние в JSON. Если скрипт падал (а он падал), при следующем запуске он говорил: «Привет, я Санитар. Я уже обработал 42 файла, упал на ошибке "диск переполнен", продолжаю». Это уже было похоже на жизнь.
Второе прозрение: первый агент должен решать самую грязную, рутинную задачу.
Я посмотрел на рабочий процесс. Самая нудная, частая и критичная операция — «санитарная обработка» входящих файлов. Сотни PDF, JPEG, Excel-файлов сыпятся из разных источников: сканеры, электронная почта, скидывают медсёстры с флешек. Всё в одну папку inbox/. Бардак.
Задача «санитара»:
1. Обнаружить новый файл в inbox/.
2. Определить его тип (заявка, результат, скан паспорта, чёрт-те что).
3. Переместить в соответствующую папку processed/{type}/.
4. Записать метаданные (откуда, куда, когда) в лог.
5. Не сломаться, если пришёл файл с именем РЕЗУЛЬТАТ_СРОЧНО!!.PDF.
Я создал SanitaryAgent, наследника BaseAgent.
# sanitary_agent_v1.py - первый боевой агент
import os
import shutil
import hashlib
from pathlib import Path
from .base_agent import BaseAgent
class SanitaryAgent(BaseAgent):
def __init__(self, work_dir: str, inbox_path: str):
super().__init__(name="SanitaryAgent_v1", work_dir=work_dir)
self.inbox_path = Path(inbox_path)
self.processed_base = Path(work_dir) / "processed"
self.processed_base.mkdir(parents=True, exist_ok=True)
# Простая, но расширяемая классификация по ключевым словам
self.rules = {
'application': ['заявка', 'направление', 'request', 'app'],
'result': ['результат', 'анализ', 'отчет', 'result', 'lab'],
'scan': ['скан', 'паспорт', 'страховка', 'scan', 'id'],
'image': ['.jpg', '.jpeg', '.png', '.tiff'] # для микроскопии
}
def _classify_file(self, file_path: Path) -> str:
"""Определяем тип файла. Пока по имени, потом добавим ML."""
name = file_path.name.lower()
suffix = file_path.suffix.lower()
for file_type, keywords in self.rules.items():
if any(keyword in name for keyword in keywords):
return file_type
# Если суффикс изображения, но не опознан
if suffix in self.rules['image']:
return 'image'
return 'unknown'
def _process(self):
self.state.current_task = "scanning_inbox"
files = list(self.inbox_path.iterdir())
print(f"[{self.state.name}] Found {len(files)} items in inbox.")
for item in files:
if item.is_file():
self._process_file(item)
def _process_file(self, file_path: Path):
file_type = self._classify_file(file_path)
dest_dir = self.processed_base / file_type / datetime.now().strftime("%Y-%m-%d")
dest_dir.mkdir(parents=True, exist_ok=True)
# Генерируем уникальное имя, чтобы избежать перезаписи
file_hash = hashlib.md5(file_path.read_bytes()).hexdigest()[:8]
new_name = f"{datetime.now().strftime('%H%M%S')}_{file_hash}{file_path.suffix}"
dest_path = dest_dir / new_name
try:
shutil.move(str(file_path), str(dest_path))
# Логируем действие в контекст агента и в отдельный файл
log_entry = {
"timestamp": datetime.now().isoformat(),
"original_name": file_path.name,
"new_path": str(dest_path),
"file_type": file_type
}
self.state.update_context(f"file_{self.state.processed_items}", log_entry)
self.state.processed_items += 1
print(f" -> Moved '{file_path.name}' to '{dest_path}' as '{file_type}'")
except Exception as e:
error_msg = f"Failed to process {file_path.name}: {e}"
print(f" [ERROR] {error_msg}")
self.state.last_error = error_msg
# Перемещаем проблемный файл в карантин
quarantine_dir = self.processed_base / "quarantine"
quarantine_dir.mkdir(exist_ok=True)
shutil.move(str(file_path), str(quarantine_dir / file_path.name))
Я запустил его впервые:
cd /opt/lab_ai
python -m agents.sanitary_agent_v1 --work-dir ./data --inbox ./hot_inbox
Он просканировал 127 файлов в hot_inbox. Сгенерировал папки processed/application/2023-10-27/, processed/result/.... Переместил, залогировал. В quarantine улетело 3 битых PDF, которые висели в системе неделями.
Это был момент истины. Один агент, 120 строк кода (плюс каркас), заменил мне утренний ритуал ручной сортировки, который отнимал 25-30 минут каждый день. Он не был идеален. Он путал заявки от терапии со сканами, если в имени было слово «анализ». Но он работал. И его состояние сохранялось.
Я поставил его на cron, чтобы он запускался каждые 5 минут.
# /etc/cron.d/lab_ai_sanitary
*/5 * * * * root cd /opt/lab_ai && /usr/bin/python3 -m agents.sanitary_agent_v1 --work-dir ./data --inbox ./hot_inbox >> /var/log/lab_ai/sanitary.log 2>&1
Через неделю лог показал: SanitaryAgent_v1 processed 2147 files. Quarantine: 12. Я сэкономил ~5 часов ручной работы. Но главное — я получил работающий паттерн. Теперь я знал, как рождаются агенты: не из сложных фреймворков, а из конкретной боли, простого каркаса и чёткой обязанности.
Этот «санитар» стал фундаментом. Он очищал поле боя для следующих, более умных агентов. И он же показал следующую проблему: как управлять этими ребятами, когда их станет больше одного? Как они будут общаться? Но это уже история для следующей секции — про FastAPI, Docker Swarm и архитектуру, которая не развалится от двух агентов одновременно.
Сердце системы: архитектура на FastAPI и Docker Swarm (плюс рабочий docker-compose.yml)
Сердце системы: архитектура на FastAPI и Docker Swarm (плюс рабочий docker-compose.yml)
Один скрипт-санитар был классным экспериментом. Но когда я понял, что хочу не одного, а 27 разных агентов — для классификации, валидации, генерации отчетов, общения с LIS — стало ясно: нужно сердце. Центр, который будет принимать запросы, распределять задачи, следить за здоровьем всех этих микросервисов.
Я выбрал FastAPI. Почему? Скорость (Starlette под капотом), автоматическая документация (Swagger UI — моя любовь), и простота. Как врач, я ценю простые и эффективные инструменты. Flask был слишком минималистичным, Django — слишком тяжёлым. FastAPI оказался тем золотым сечением: я могу быстро описать, что принимает API и что возвращает, а система сама построит маршруты и документацию.
Версии на тот момент (март 2023): Python 3.9.16, FastAPI 0.95.2, Uvicorn 0.22.0.
Сердце — это orchestrator. Его задача:
1. Принять задачу (например, "классифицировать 50 образцов крови по срочности").
2. Определить, какой агент (classifier) её должен выполнить.
3. Отправить задачу этому агент-сервису через его собственный API.
4. Получить ответ, обработать его, возможно, передать следующему агент-сервису (report_generator).
5. Вернуть финальный результат клиенту (моему основному скрипту или веб-интерфейсу).
Вот скелет этого оркестратора:
# orchestrator/main.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import httpx
import logging
import asyncio
# Конфигурация агентов
AGENT_CONFIG = {
"classifier": {"url": "http://classifier:8001", "health_endpoint": "/health"},
"validator": {"url": "http://validator:8002", "health_endpoint": "/health"},
"sanitizer": {"url": "http://sanitizer:8003", "health_endpoint": "/health"},
"report_generator": {"url": "http://report_generator:8004", "health_endpoint": "/health"},
}
app = FastAPI(title="Lab AI Orchestrator", version="1.0.0")
logger = logging.getLogger("orchestrator")
class ProcessingRequest(BaseModel):
task_type: str # "classify", "validate", "sanitize_and_report"
data: List[dict] # Список образцов с метаданными
priority: Optional[str] = "normal"
@app.get("/health")
async def health():
"""Эндпоинт здоровья оркестратора."""
return {"status": "healthy", "service": "orchestrator"}
@app.post("/process")
async def process_task(request: ProcessingRequest, background_tasks: BackgroundTasks):
"""Основной эндпоинт для обработки задач."""
logger.info(f"Received task: {request.task_type} with {len(request.data)} samples.")
# Определяем целевого агента
target_agent = None
if request.task_type.startswith("classify"):
target_agent = "classifier"
elif request.task_type.startswith("validate"):
target_agent = "validator"
elif request.task_type.startswith("sanitize"):
# Комплексная задача: сначала санитар, потом классификатор, потом генератор отчетов
background_tasks.add_task(complex_sanitize_task, request.data)
return {"message": "Complex sanitization task started in background", "task_id": "async_123"}
if not target_agent:
raise HTTPException(status_code=400, detail=f"Unknown task type: {request.task_type}")
# Проверяем здоровье агента перед отправкой
health_url = AGENT_CONFIG[target_agent]["url"] + AGENT_CONFIG[target_agent]["health_endpoint"]
try:
async with httpx.AsyncClient(timeout=5.0) as client:
health_response = await client.get(health_url)
if health_response.status_code != 200:
raise HTTPException(status_code=503, detail=f"Agent {target_agent} is unhealthy.")
except httpx.RequestError as e:
logger.error(f"Health check failed for {target_agent}: {e}")
raise HTTPException(status_code=503, detail=f"Agent {target_agent} is unreachable.")
# Отправляем задачу агент-сервису
agent_url = AGENT_CONFIG[target_agent]["url"] + "/process"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(agent_url, json={"samples": request.data})
result = response.json()
except httpx.RequestError as e:
logger.error(f"Task failed for {target_agent}: {e}")
raise HTTPException(status_code=500, detail=f"Agent {target_agent} processing error.")
logger.info(f"Task completed by {target_agent}. Results: {len(result.get('classified_samples', []))}")
return {"processed_by": target_agent, "results": result}
async def complex_sanitize_task(samples):
"""Пример сложной задачи, выполняемой фоном."""
# Это упрощённый пример. В реальности тут цепочка вызовов.
logger.info("Starting complex sanitize task in background...")
# ... вызовы к sanitizer, затем classifier, затем report_generator
await asyncio.sleep(1)
logger.info("Complex sanitize task completed.")
Но оркестратор — это лишь один контейнер. Все 27 агентов тоже должны быть контейнерами. И они должны быть связаны, управляемы и масштабируемы. На тот момент Kubernetes казался космическим кораблем, а я был на уровне деревянной лодки. Поэтому я выбрал Docker Swarm — более простой, но достаточно мощный для моих нужд.
Вот рабочий docker-compose.yml, который стал основой для Swarm (docker stack deploy). Это не псевдокод, это реальный файл, который я запускал сотни раз.
# docker-compose.yml для Docker Swarm
version: '3.8'
services:
orchestrator:
image: lab-ai-orchestrator:1.0.0
build:
context: ./orchestrator
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- LOG_LEVEL=INFO
- AGENT_TIMEOUT=30
networks:
- lab-ai-net
deploy:
replicas: 2
restart_policy:
condition: on-failure
delay: 10s
max_attempts: 3
placement:
constraints:
- node.role == manager # Оркестратор только на manager ноде
classifier:
image: lab-ai-classifier:1.0.2
build:
context: ./agents/classifier
dockerfile: Dockerfile
ports:
- "8001:8001"
environment:
- MODEL_PATH=/app/models/blood_classifier_v3.onnx
- THRESHOLD_URGENT=0.85
volumes:
- classifier_models:/app/models
networks:
- lab-ai-net
deploy:
replicas: 3
restart_policy:
condition: any
resources:
limits:
memory: 1G
reservations:
memory: 512M
validator:
image: lab-ai-validator:1.0.1
build:
context: ./agents/validator
dockerfile: Dockerfile
ports:
- "8002:8002"
environment:
- ALLOWED_TESTS=WBC,RBC,HGB,PLT
- MAX_SAMPLE_AGE_HOURS=72
networks:
- lab-ai-net
deploy:
replicas: 2
sanitizer:
image: lab-ai-sanitizer:1.0.3
build:
context: ./agents/sanitizer
dockerfile: Dockerfile
ports:
- "8003:8003"
environment:
- SCAN_INTERVAL_SECONDS=300
- QUARANTINE_PATH=/app/quarantine
volumes:
- lab_data:/data/raw
- sanitizer_quarantine:/app/quarantine
networks:
- lab-ai-net
deploy:
replicas: 2
report_generator:
image: lab-ai-report-generator:1.0.0
build:
context: ./agents/report_generator
dockerfile: Dockerfile
ports:
- "8004:8004"
environment:
- TEMPLATE_DIR=/app/templates
- OUTPUT_DIR=/app/generated_reports
volumes:
- report_templates:/app/templates
- generated_reports:/app/generated_reports
networks:
- lab-ai-net
deploy:
replicas: 2
# Мониторинг (простейший)
monitor:
image: lab-ai-monitor:0.1.0
build:
context: ./monitor
dockerfile: Dockerfile
ports:
- "8080:8080"
environment:
- ORCHESTRATOR_URL=http://orchestrator:8000/health
- AGENTS_URLS=http://classifier:8001/health,http://validator:8002/health
networks:
- lab-ai-net
deploy:
replicas: 1
placement:
constraints:
- node.role == manager
networks:
lab-ai-net:
driver: overlay # Ключевое для Swarm: overlay сеть для связи между нодами
volumes:
lab_data:
driver: local
classifier_models:
driver: local
sanitizer_quarantine:
driver: local
report_templates:
driver: local
generated_reports:
driver: local
Команды, которые превращали этот файл в живую систему:
# Сборка всех образов (долго, но необходимо)
docker-compose build --parallel
# Инициализация Swarm на основной ноде (мой ноутбук стал manager)
docker swarm init
# Деплой стека в Swarm
docker stack deploy -c docker-compose.yml lab-ai-stack
# Проверка статуса
docker stack services lab-ai-stack
# Логи конкретного сервиса (когда что-то падало)
docker service logs lab-ai-stack_classifier --tail 50 --follow
Почему Swarm? Потому что он дал мне:
1. Overlay сеть: агенты на разных физических нодах (мой ноутбук и два старых сервера в лаборатории) могли общаться друг с другом как будто они локально.
2. Реплики и масштабирование: replicas: 3 для классификатора означало, что три контейнера будут обслуживать запросы параллельно. Swarm распределял их между нодами автоматически.
3. Отказоустойчивость: restart_policy: any и Swarm сам перезапускал контейнеры при падении. Это спало мне ночь, когда санитар умирал каждые 4 часа из-за памяти.
4. Простота управления: одна команда docker stack deploy обновляла весь стек.
Но первая неделя работы этой архитектуры была адом. Overlay сеть иногда теряла пакеты между нодами, health-чек оркестратора возвращал 503, хотя агент был жив. Логи были разбросаны по трём машинам. Я понял, что сердце системы — это не только FastAPI и Docker. Это ещё и мониторинг, логирование, и тонкая настройка сетевых таймаутов.
Ключевые цифры на тот момент: - 5 сервисов в стеке (orchestrator + 4 агента). - 10 контейнеров суммарно (с учётом реплик). - Среднее время ответа оркестратора на простую задачу: 1.2 секунды (без учёта времени агентов). - Первый полный цикл обработки 100 проб: 4 минуты 37 секунд (от санитара до генератора отчетов).
Это было сердце. Но оно было молодое, аритмичное, и требовало постоянного наблюдения. Следующим шагом было научить его не просто биться, но и эффективно снабжать кровью (данными) все 27 органов (агентов). И следить за их здоровьем, чтобы не умирали в 3 часа ночи, когда я пытался спать.
Когда один контейнер — мало: масштабирование, мониторинг и 27 голодных агентов
Когда я развернул первый контейнер с агентом-классификатором, я почувствовал себя богом. FastAPI, Docker Swarm, всё крутится. Через неделю этот бог с ужасом смотрел на график потребления RAM, который напоминал кардиограмму пациента в терминальной стадии. Один контейнер — это тишина. 27 — это гулкий рой голодных ос, каждая из которых хочет GPU, память и твою душу.
Проблема №1: Агенты пожирают всё, как только просыпаются.
Мой docker-compose.yml из прошлой секции запускал сервисы в режиме replicated. Swarm равномерно раскидывал их по нодам. Но он не знал главного: агент-генератор отчётов (report_agent) жрёт 4 ГБ RAM при старте и успокаивается до 1.5 ГБ, а агент-валидатор (validator_agent) тихий (512 МБ), но просыпается каждые 5 секунд и дергает CPU. В итоге: одна нода в коме, другие простаивают.
Решение — ограничения и резервы в композе. Я учился на крови (логи лаборатории — почти как системные логи, только пахнут иначе).
# docker-compose.v2.yml - фрагмент с лимитами
services:
report_agent:
image: lab/report-agent:1.2
deploy:
replicas: 2
resources:
limits:
cpus: '1.0'
memory: 2G
reservations:
cpus: '0.5'
memory: 1G
environment:
- AGENT_SLEEP=0.1 # Агрессивность, от 0.1 до 1.0
validator_agent:
image: lab/validator:1.1
deploy:
replicas: 5
resources:
limits:
cpus: '0.5'
memory: 768M
reservations:
cpus: '0.1'
memory: 256M
environment:
- VALIDATION_BATCH_SIZE=10
Команда развертывания стала сложнее:
docker stack deploy -c docker-compose.v2.yml -c docker-compose.monitor.yml lab_ai --with-registry-auth
Проблема №2: Как узнать, кто из 27 агентов сдох?
Docker Swarm говорит «service healthy», но агент внутри контейнера мог упасть в депрессию (зависнуть, зациклиться, потерять связь с Redis). Мне нужен был мониторинг не контейнеров, а функциональности. Я сделал эндпоинт /health для каждого агента, который проверяет: 1) связь с БД, 2) связь с Redis, 3) загрузку CPU > 95% за последнюю минуту (это крик о помощи).
# health_check.py в каждом агенте (FastAPI)
from fastapi import APIRouter, HTTPException
import psutil, redis, asyncpg
from datetime import datetime, timedelta
router = APIRouter()
@router.get("/health")
async def deep_health():
errors = []
# 1. Redis
try:
r = redis.Redis(host="redis", port=6379, socket_connect_timeout=2)
r.ping()
except Exception as e:
errors.append(f"Redis: {e}")
# 2. Postgres
try:
conn = await asyncpg.connect("postgresql://user:pass@postgres:5432/lab")
await conn.close()
except Exception as e:
errors.append(f"Postgres: {e}")
# 3. CPU sanity check
cpu_percent = psutil.cpu_percent(interval=1)
if cpu_percent > 95.0:
errors.append(f"CPU overload: {cpu_percent}%")
# 4. Memory check (более 90% от лимита Docker)
mem = psutil.virtual_memory()
if mem.percent > 90:
errors.append(f"Memory high: {mem.percent}%")
if errors:
raise HTTPException(status_code=503, detail={"status": "unhealthy", "errors": errors})
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
Но 27 эндпоинтов — это 27 точек, которые нужно опрашивать. Вручную? Я же не сумасшедший (хотя...). Поднял Prometheus + Grafana в отдельном стеке.
# docker-compose.monitor.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.45.0
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- prom_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
ports:
- "9090:9090"
deploy:
placement:
constraints:
- node.role == manager
grafana:
image: grafana/grafana:10.0.0
volumes:
- grafana_data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123 # Да, я знаю. Не в продакшен.
ports:
- "3000:3000"
deploy:
placement:
constraints:
- node.role == manager
volumes:
prom_data:
grafana_data:
Конфиг Prometheus для опроса здоровья агентов (через docker swarm network):
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'ai-agents'
static_configs:
- targets:
- 'report_agent:8000'
- 'validator_agent:8000'
- 'classifier_agent:8000'
- 'lis_connector:8000'
# ... и ещё 23 цели
metrics_path: /metrics
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: prometheus:9090 # Blackbox exporter, если бы он был
- job_name: 'agent-health'
metrics_path: /health
static_configs:
- targets:
- 'report_agent:8000'
- 'validator_agent:8000'
# ... все агены
relabel_configs:
- source_labels: [__address__]
regex: '(.*):8000'
target_label: agent
replacement: '$1'
Проблема №3: Логи. Их было больше, чем проб в лаборатории.
27 контейнеров → 27 потоков логов в docker service logs. Искать ошибку — это как искать иголку в стоге сена, который ещё и кричит. Решение: централизованный сбор в Loki + отображение в Grafana.
Добавил в docker-compose.monitor.yml:
loki:
image: grafana/loki:2.9.0
command: -config.file=/etc/loki/local-config.yaml
ports:
- "3100:3100"
promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./monitoring/promtail-config.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
Конфиг Promtail для парсинга Docker-логов:
# monitoring/promtail-config.yml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
- source_labels: ['__meta_docker_container_name']
regex: '/(.*)'
target_label: 'container'
- source_labels: ['__meta_docker_service_name']
target_label: 'service'
- source_labels: ['__meta_docker_service_task_slot']
target_label: 'task_slot'
Теперь в Grafana я мог одним запросом {service="report_agent"} |= "ERROR" найти все ошибки по всем репликам агента-генератора.
Итоговая картина спустя месяц:
На дашборде Grafana висит 12 графиков. Потребление памяти плавает между 24 и 32 ГБ на кластере из 3 нод. CPU в среднем 40%. Каждую ночь в 3:00 (да, ирония) cron в контейнере-менеджере запускает стресс-тест: симулирует пиковую нагрузку (500+ задач в очередь Redis). Мы падали дважды: когда Redis исчерпал maxmemory (исправил политикой allkeys-lru) и когда сетевая привязка Swarm не выдержала 10 тыс. RPS между агентами (помогло увеличение сетевых пулов и --opt encrypted).
27 агентов живут. Они едят, спят, работают. Система мониторинга дышит мне в спину. Каждый день я получаю алерт в Telegram, если здоровье любого агента ниже 95% за последний час. Это уже не скрипт. Это организм. И иногда он болеет. Но теперь у меня есть его карта крови, температура и давление в реальном времени. Почти как в моей лаборатории, только пациент — мой собственный код.
Выводы: что я понял и чего не повторю
Итак, мы прошли путь от одного скрипта в 3 часа ночи до 27 голодных контейнеров в Swarm. Если бы я делал это по учебнику, всё было бы гладко. Но я делал это по боли. И вот что я понял, глядя на этот лабиринт из Python, Docker, Prometheus и невыносимой надежды.
Урок 1: Никогда не начинай с агентов. Начинай с интерфейса к твоему конкретному кошмару.
Моя первая ошибка: я начал думать о «классификаторах», «аналитиках», «репортёрах». Это абстракции. Реальность была в папке C:\Lab\Incoming\2024-05-15\, в файлах blood_*.csv с кривыми колонками. Если бы я сразу написал не агента, а простой, смертельный интерфейс к этой папке, я бы сэкономил месяц.
Что я сделал бы сейчас:
# В самом начале проекта. core_interface.py
import pandas as pd
from pathlib import Path
import json
class LabDataInterface:
def __init__(self, incoming_root: str):
self.root = Path(incoming_root)
# ВСЕ агенты потом будут работать через этот класс
# Он знает ВСЮ специфику моих файлов
def list_new_samples(self) -> list[dict]:
# Не просто список файлов, а сразу dict с метаданными
# Например: {'path': '...', 'timestamp': '...', 'type': 'blood'}
pass
def load_sample_data(self, sample_path: str) -> pd.DataFrame:
# Здесь все костыли для моих кривых CSV
# pd.read_csv(... encoding='cp1251', sep=';', on_bad_lines='warn')
pass
def write_result(self, sample_id: str, result: dict):
# Единый метод записи результата для всех агентов
# В стандартный формат, в стандартную папку
pass
Создайте этот интерфейс ДО того, как начнёте думать о AI. Это ваша «плазма» — основа, на которую будут «подсаживаться» агенты. Я потратил 3 недели, переписывая логику чтения файлов в каждом новом агене. Безумие.
Урок 2: Docker Swarm — не для первого дня. Но Docker Compose — обязателен с первого часа.
Я начал с локальных скриптов. Они работали. Когда я добавил второй скрипт, они начали конфликтовать по зависимостям (один требовал pandas==1.5.3, другой pandas==2.0.0). На третий скрипт у меня был уже ад с версиями.
Совет: даже если у вас один скрипт, запихните его в Docker Compose с первого дня. Это не сложно.
# docker-compose.v1.yml — для одного агента!
version: '3.8'
services:
agent-sanitizer:
build:
context: ./agents/sanitizer
dockerfile: Dockerfile
volumes:
- /mnt/lab/incoming:/app/incoming:ro
- ./logs:/app/logs
environment:
- LOG_LEVEL=INFO
restart: unless-stopped
Почему?
1. Изоляция зависимостей: ваш pandas не будет конфликтовать с системным.
2. Готовый путь к масштабированию: когда придёт время для Swarm, вы уже всё упаковали.
3. Воспроизводимость: любой коллега сможет запустить вашу систему одной командой: docker-compose up -d.
Я перешёл на Swarm только когда было 5 агентов. И переход был болезненным, потому что я не заложил основы в Compose. Если бы я начал с Compose, переход в Swarm занял бы день, а не неделю.
Урок 3: Мониторинг — это не «потом». Это «сразу», даже если мониторишь один процесс.
Я добавил Prometheus и Grafana, когда система уже падала раз в день. Я неделю гадал, почему агент-классификатор «зависает». Потом поставил мониторинг и увидел: он зависает, когда получает файл больше 50 МБ. Просто память кончается.
Что нужно поставить сразу, даже для одного агента:
1. Логирование в структурированном формате (JSON). Не print(), а сразу json.dumps().
2. Базовые метрики в Prometheus — количество обработанных файлов, время обработки, ошибки.
3. Health-check endpoint в каждом агене (если он на FastAPI).
Как это выглядит в коде агента с первого дня:
# agent_base.py
from prometheus_client import Counter, Histogram
import time
PROCESSED_FILES = Counter('agent_processed_files_total', 'Total processed files')
PROCESS_TIME = Histogram('agent_process_time_seconds', 'Time per file processing')
def process_file(self, file_path):
start_time = time.time()
try:
# ... ваша логика ...
PROCESSED_FILES.inc()
finally:
PROCESS_TIME.observe(time.time() - start_time)
И health-check в FastAPI:
@app.get("/health")
def health():
return {"status": "healthy", "timestamp": datetime.utcnow()}
Затем в docker-compose.yml добавляете Prometheus, который собирает эти метрики. Это 30 минут работы на старте. Но это даёт вам глаза в систему сразу. Я без этого месяца ходил как слепой.
Урок 4: 27 агентов — это не 27 скриптов. Это 3 типа агентов и 24 их конфигурации.
Мой главный провал в архитектуре: я сделал каждого агента как отдельный проект, со своим Dockerfile, своей структурой. Когда я захотел обновить логику чтения данных, мне нужно было внести изменения в 27 репозиториев. Ад.
Что я понял: большинство агентов — это одна и та же «рабочая единица», но с разными конфигурациями и моделями.
Идеальная структура (к которой я пришёл через боль):
agents/
├── core/ # Общий код: интерфейс к данным, логирование, метрики
├── agent_types/
│ ├── classifier/ # Тип 1: классификатор (общий код)
│ ├── validator/ # Тип 2: валидатор (общий код)
│ ├── reporter/ # Тип 3: генератор отчетов (общий код)
├── instances/
│ ├── classifier_blood/ # Конкретный агент: классификатор крови
│ │ ├── config.json # Конфигурация
│ │ ├── model.pth # Его конкретная модель
│ │ └── Dockerfile # Тонкий Dockerfile, который использует общий
│ ├── classifier_urine/
│ ├── validator_main/
Тогда:
- Обновление core — обновляет всех агентов.
- Обновление логики classifier — обновляет всех классификаторов.
- Каждый конкретный агент — это лишь конфигурация и модель.
Я переделал систему на эту структуру. Это заняло 2 месяца миграции. Если бы я начал так, я бы сохранил год жизни.
Урок 5: Самое важное — не AI, а pipeline. Ваши агенты должны быть «трубами», а не «островами».
Первые мои агенты были независимыми монадами. Агент-классификатор работал и клал результат в папку classified. Агент-валидатор читал эту папку, работал и клал в validated. Это приводило к:
1. Race conditions: валидатор иногда начинал работать на файл, который классификатор ещё не закончил.
2. Потерям данных: файлы могли «застревать» между агентами.
3. Сложному мониторингю: где сейчас файл? В какой стадии?
Решение: единый pipeline, управляемый центральным оркестратором (даже простым). Каждый агент не работает с файлами, он работает с «заданиями» из очереди.
Как это выглядит в итоге:
# orchestrator_simple.py
import redis # Используем Redis как очередь
import json
r = redis.Redis(host='redis', port=6379)
def submit_task(task_type: str, file_meta: dict):
task_id = str(uuid.uuid4())
task = {
'id': task_id,
'type': task_type,
'data': file_meta,
'status': 'pending',
'created_at': datetime.utcnow().isoformat()
}
r.set(f'task:{task_id}', json.dumps(task))
r.rpush(f'queue:{task_type}', task_id)
Агент не ищет файлы. Он:
1. Берёт task_id из очереди queue:classification.
2. Получает задание из task:{task_id}.
3. Работает.
4. Обновляет статус задания и помещает его в следующую очередь queue:validation.
Это даёт:
- Полную трассировку: каждый файл имеет task_id, вы можете отследить его путь через весь pipeline.
- Нет race conditions: файл не существует в двух стадиях одновременно.
- Единую точку мониторинга: очередь Redis — это центральное состояние системы.
Я внедрил это только на 20-м агентe. До этого был хаос. Если бы я начал с pipeline — даже на одном агентe — я бы избежал сотни проблем.
Три конкретных совета для таких же сумасшедших, которые кодируют в 3 ночи:
- Сначала сделайте «руки» и «глаза» системы, потом «мозг». Реализуйте интерфейс к вашим данным и мониторинг прежде, чем первый AI агент. Это как в медицине: сначала диагностика, потом лечение.
- Каждый компонент, даже первый, должен быть контейнером с health-check. Не позволяйте скриптам жить на хосте.
docker-compose up -d— ваша первая команда послеimport pandas. - Спроектируйте pipeline сразу, даже если сейчас в нем один шаг. Представьте, как данные будут двигаться от входа к выходу. Это сэкономит вам переделку всей системы, когда вы добавите второй агент.
И последнее, самое важное: эта система — не про AI. Она про вашу конкретную боль. AI агенты — это просто инструменты, которые решают эту боль. Не начинайте с чтения научных статей о новых моделях. Начните с файла core_interface.py, который читает ваш кривой CSV. Тогда всё остальное будет иметь смысл.
Моя лаборатория теперь работает. 27 агентов жужжат в Swarm. Prometheus рисует красивые графики. Но если бы я мог вернуться в ту первую ночь, с одним скриптом и болью в спине, я бы начал не с if-else адского кода. Я бы начал с docker-compose.yml, core_interface.py и redis. И тогда путь от одного скрипта до 27 агентов занял бы не год, а три месяца.
Учитесь на моих косяках. И не забывайте спать. Иногда.