Перейти к содержанию

brain.core — Ядро системы

Базовые компоненты: EventBus, Scheduler, Protocol-контракты, утилиты.


Contracts (Protocol DI)

Все зависимости инъектируются через typing.Protocol (@runtime_checkable).

contracts

brain/core/contracts.py

Общие сквозные контракты (dataclass/enum/protocol) для взаимодействия слоёв. Цель: единый типовой "язык" между perception, encoders, cognition, output, logging.

Правила совместимости (A.2): - Не переименовывать поля без миграции (добавлять новые поля с default). - Все "действия" несут trace_id / session_id / cycle_id. - Сериализация: to_dict() / from_dict() на каждом контракте. - Enum-поля сериализуются как строки (.value).

ContractMixin

Mixin для dataclass-контрактов. Предоставляет to_dict() и from_dict() с поддержкой Enum-полей.

to_dict
to_dict() -> Dict[str, Any]

Рекурсивно конвертирует dataclass в dict (Enum → str).

from_dict classmethod
from_dict(data: Dict[str, Any]) -> T

Создаёт экземпляр из dict.

Рекурсивно восстанавливает
  • Enum-поля из строковых значений
  • Вложенные dataclass-поля из dict (если тип — ContractMixin)
  • List[dataclass] из списка dict
  • Optional[dataclass/Enum] — разворачивает обёртку

Неизвестные ключи игнорируются (forward-compatibility).

Modality

Bases: str, Enum

Поддерживаемые модальности входа/представления.

TaskStatus

Bases: str, Enum

Статус выполнения задачи в scheduler/cognition.

ResourceState dataclass

ResourceState(cpu_pct: float = 0.0, ram_pct: float = 0.0, ram_used_mb: float = 0.0, ram_total_mb: float = 0.0, available_threads: int = 1, ring2_allowed: bool = True, soft_blocked: bool = False)

Bases: ContractMixin

Снимок ресурсного состояния системы. Используется для degradation policy и принятия решения о тяжёлых ветках.

Task dataclass

Task(task_id: str, task_type: str, payload: Dict[str, Any] = dict(), priority: float = 0.5, status: TaskStatus = TaskStatus.PENDING, trace_id: str = '', session_id: str = '', cycle_id: str = '')

Bases: ContractMixin

Унифицированная задача для scheduler/event loop.

EncodedPercept dataclass

EncodedPercept(percept_id: str, modality: Modality, vector: List[float] = list(), text: str = '', quality: float = 0.0, source: str = '', language: str = '', message_type: str = 'unknown', encoder_model: str = '', vector_dim: int = 0, metadata: Dict[str, Any] = dict(), trace_id: str = '', session_id: str = '', cycle_id: str = '')

Bases: ContractMixin

Результат кодирования единичного перцепта одной модальности.

Top-level поля — только стабильные, используемые downstream-слоями. Всё остальное (keywords, encoding_time_ms, warnings, sentiment) → metadata.

FusedPercept dataclass

FusedPercept(fused_id: str, inputs: List[EncodedPercept] = list(), fused_vector: List[float] = list(), entities: List[str] = list(), confidence: float = 0.0, metadata: Dict[str, Any] = dict(), trace_id: str = '', session_id: str = '', cycle_id: str = '')

Bases: ContractMixin

Кросс-модальное объединение нескольких EncodedPercept.

TraceRef dataclass

TraceRef(ref_type: str, ref_id: str, note: str = '')

Bases: ContractMixin

Ссылка на источник в цепочке объяснимости. Например: memory_ref, hypothesis_ref, source_ref.

TraceStep dataclass

TraceStep(step_id: str, module: str, action: str, confidence: float = 0.0, details: Dict[str, Any] = dict(), refs: List[TraceRef] = list())

Bases: ContractMixin

Один шаг reasoning/decision trace.

TraceChain dataclass

TraceChain(trace_id: str, session_id: str = '', cycle_id: str = '', input_refs: List[TraceRef] = list(), steps: List[TraceStep] = list(), output_refs: List[TraceRef] = list(), summary: str = '')

Bases: ContractMixin

Полная цепочка причинности для одного решения.

CognitiveResult dataclass

CognitiveResult(action: str, response: str, confidence: float, trace: TraceChain, goal: str = '', trace_id: str = '', session_id: str = '', cycle_id: str = '', contradictions: List[str] = list(), uncertainty: float = 0.0, salience: float = 0.0, memory_refs: List[TraceRef] = list(), source_refs: List[TraceRef] = list(), metadata: Dict[str, Any] = dict())

Bases: ContractMixin

Результат когнитивного цикла (reasoning + action selection).

BrainOutput dataclass

BrainOutput(text: str, confidence: float, trace_id: str, session_id: str = '', cycle_id: str = '', digest: str = '', action: Optional[str] = None, metadata: Dict[str, Any] = dict())

Bases: ContractMixin

Финальный внешний вывод системы (для dialogue/action API).

TextEncoderProtocol

Bases: Protocol

Формальный интерфейс TextEncoder для dependency injection.

Любой объект с методом encode(text, ...) → EncodedPercept удовлетворяет этому протоколу (structural subtyping).

encode
encode(text: str, source: str = 'user_input', quality: float = 1.0, trace_id: str = '', session_id: str = '', cycle_id: str = '') -> Any

Кодировать текст → EncodedPercept.

MemoryManagerProtocol

Bases: Protocol

Формальный интерфейс MemoryManager для dependency injection.

Любой объект с методами store(), retrieve(), store_fact(), save_all() удовлетворяет этому протоколу (structural subtyping).

store
store(content: str, importance: float = 0.5, source_ref: str = '', tags: Optional[List[str]] = None) -> Any

Сохранить контент в память (working + episodic + semantic).

retrieve
retrieve(query: str, top_n: int = 5) -> Any

Поиск по всем видам памяти.

store_fact
store_fact(concept: str, description: str, importance: float = 0.5) -> Any

Явное сохранение факта в семантическую память.

save_all
save_all() -> None

Сохранить все виды памяти на диск.

EventBusProtocol

Bases: Protocol

Формальный интерфейс EventBus для dependency injection.

Любой объект с методами publish(), subscribe(), unsubscribe() удовлетворяет этому протоколу.

Сигнатура publish() синхронизирована с реальным EventBus: - payload (не data) — единый naming convention - trace_id — для трассировки - возвращает int (количество вызванных handlers)

publish
publish(event_type: str, payload: Any = None, trace_id: str = '') -> int

Опубликовать событие. Возвращает количество вызванных handlers.

subscribe
subscribe(event_type: str, handler: Any) -> None

Подписаться на тип события.

unsubscribe
unsubscribe(event_type: str, handler: Any) -> None

Отписаться от типа события.

ResourceMonitorProtocol

Bases: Protocol

Формальный интерфейс ResourceMonitor для dependency injection.

Любой объект с методом snapshot() удовлетворяет этому протоколу.

snapshot
snapshot() -> Any

Получить текущий снимок ресурсов (ResourceState или dict).

Ключевые протоколы

Протокол Реализация Используется в
MemoryManagerProtocol MemoryManager CognitiveCore, CognitivePipeline
TextEncoderProtocol TextEncoder CognitiveCore, CognitivePipeline
EventBusProtocol EventBus, ThreadPoolEventBus CognitiveCore, CognitivePipeline
ResourceMonitorProtocol ResourceMonitor CognitiveCore, CognitivePipeline

EventBus

Синхронная шина событий с поддержкой снапшотов.

EventBus

EventBus(brain_logger: Optional[BrainLogger] = None)

Синхронная typed pub/sub шина событий.

Использование

bus = EventBus()

def on_percept(event_type, payload, trace_id): print(f"[{trace_id}] {event_type}: {payload}")

bus.subscribe("percept", on_percept) bus.publish("percept", {"text": "hello"}, trace_id="t-001") bus.unsubscribe("percept", on_percept)

Wildcard

bus.subscribe("*", debug_handler) # получает ВСЕ события

subscribe

subscribe(event_type: str, handler: Handler) -> None

Подписать handler на event_type. Повторная подписка одного и того же handler игнорируется.

unsubscribe

unsubscribe(event_type: str, handler: Handler) -> None

Отписать handler от event_type. Если handler не был подписан — молча игнорируется.

publish

publish(event_type: str, payload: Any = None, trace_id: str = '') -> int

Опубликовать событие.

Вызывает все handlers подписанные на event_type, а затем wildcard-handlers ("*"). Ошибка одного handler логируется и не прерывает остальных.

Snapshot pattern: копируем список handlers под lock, вызываем вне lock — предотвращает deadlock при re-entrant publish.

Returns:

Type Description
int

Количество успешно вызванных handlers.

status

status() -> Dict[str, Any]

Словарь для логирования/observability.

ThreadPoolEventBus

Асинхронная шина событий на основе ThreadPoolExecutor (P3-9).

ThreadPoolEventBus

ThreadPoolEventBus(max_workers: int = 4, brain_logger: Optional[BrainLogger] = None)

Bases: EventBus

EventBus с диспетчеризацией handlers через ThreadPoolExecutor (P3-9).

Handlers вызываются в отдельных потоках пула — publish() не блокирует вызывающий поток. Ошибки handlers логируются асинхронно.

Дополнительные методы

publish_sync() — синхронная публикация (как в базовом EventBus) wait_all() — дождаться завершения всех pending futures shutdown() — завершить ThreadPoolExecutor

Использование

bus = ThreadPoolEventBus(max_workers=4) bus.subscribe("tick_end", on_tick) bus.publish("tick_end", {"cycle": 1}) # async bus.publish_sync("tick_end", {"cycle": 2}) # sync bus.wait_all() bus.shutdown(wait=True)

publish

publish(event_type: str, payload: Any = None, trace_id: str = '') -> int

Опубликовать событие через ThreadPoolExecutor.

Handlers вызываются в отдельных потоках пула. Ошибки логируются в _call_handler (не прерывают остальных).

Returns:

Type Description
int

Количество отправленных задач (futures submitted).

shutdown

shutdown(wait: bool = True) -> None

Завершить ThreadPoolExecutor.

Parameters:

Name Type Description Default
wait bool

True = дождаться завершения всех running tasks.

True

Scheduler

Планировщик задач с приоритетами для автономного цикла (P3-11).

Scheduler

Scheduler(event_bus: EventBus, config: Optional[SchedulerConfig] = None, brain_logger: Optional[BrainLogger] = None)

Тик-планировщик автономного цикла мозга.

Использование (MVP): bus = EventBus() scheduler = Scheduler(bus)

def handle_think(task: Task) -> Any:
    return {"result": "thought"}

scheduler.register_handler("think", handle_think)
scheduler.enqueue(Task(task_id="t1", task_type="think"), TaskPriority.NORMAL)

# Один тик вручную:
info = scheduler.tick()

# Или основной цикл (блокирующий):
scheduler.run(max_ticks=10)

stop

stop() -> None

Остановить основной цикл после завершения текущего тика.

status

status() -> Dict[str, Any]

Словарь для логирования/observability.

SchedulerConfig dataclass

SchedulerConfig(tick_normal_ms: int = 100, tick_degraded_ms: int = 500, tick_critical_ms: int = 2000, tick_emergency_ms: int = 5000, max_queue_size: int = 256, max_tasks_per_tick: int = 1, cpu_degraded_threshold: float = 70.0, cpu_critical_threshold: float = 85.0, ram_degraded_gb: float = 22.0, ram_critical_gb: float = 28.0, ram_emergency_gb: float = 30.0, session_id: str = '')

Параметры планировщика.

TaskPriority

Bases: IntEnum

Приоритет задачи в очереди планировщика. Меньшее значение = выше приоритет (heapq — min-heap).

SchedulerStats dataclass

SchedulerStats(ticks: int = 0, tasks_enqueued: int = 0, tasks_executed: int = 0, tasks_failed: int = 0, tasks_dropped: int = 0, idle_ticks: int = 0)

Накопленная статистика планировщика.


ResourceMonitor

Мониторинг ресурсов: CPU, RAM, политика ограничений.

ResourceMonitor

ResourceMonitor(event_bus: EventBus, config: Optional[ResourceMonitorConfig] = None)

Фоновый монитор CPU/RAM с graceful degradation.

Использование

bus = EventBus() monitor = ResourceMonitor(bus) monitor.start()

В основном цикле:

state = monitor.check() interval = scheduler.get_tick_interval(state)

monitor.stop()

Тестирование (без реального psutil): monitor.inject_state(ResourceState(cpu_pct=90.0)) # эмуляция high load

status

status() -> Dict[str, Any]

Словарь для логирования/observability.


Events

Типы событий системы.

events

events.py — Типизированные события для шины сообщений мозга.

Все модули общаются через события — никаких прямых зависимостей. Каждое событие сериализуется в JSON для логирования и трассировки.

BaseEvent dataclass

BaseEvent(event_type: str = 'base', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '')

Базовый класс для всех событий мозга.

to_dict
to_dict() -> Dict[str, Any]

Сериализация в словарь (для JSON-логирования).

to_json_line
to_json_line() -> str

Сериализация в одну JSON-строку (JSONL формат).

PerceptEvent dataclass

PerceptEvent(event_type: str = 'percept', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', source: str = '', modality: str = 'text', content: Any = None, quality: float = 1.0, language: str = 'unknown', metadata: Dict[str, Any] = dict())

Bases: BaseEvent

Событие восприятия — входящий сигнал из внешнего мира.

Поля

source — источник (путь к файлу, URL, "user_input", ...) modality — тип данных ('text', 'image', 'audio', 'video', 'mixed') content — содержимое (текст, путь к файлу, байты в base64) quality — оценка качества входных данных (0.0 — 1.0) language — язык ('ru', 'en', 'mixed', 'unknown') metadata — дополнительные метаданные (страница, временная метка, ...)

MemoryEvent dataclass

MemoryEvent(event_type: str = 'memory', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', operation: str = 'store', memory_type: str = 'working', key: str = '', value: Any = None, importance: float = 0.5, confidence: float = 1.0, source_ref: str = '', latency_ms: float = 0.0)

Bases: BaseEvent

Событие памяти — операция с любым видом памяти.

Поля

operation — тип операции ('store', 'retrieve', 'update', 'delete', 'consolidate', 'decay') memory_type — вид памяти ('working', 'episodic', 'semantic', 'procedural', 'source') key — ключ/концепт value — значение (факт, эпизод, ...) importance — важность записи (0.0 — 1.0) confidence — уверенность в факте (0.0 — 1.0) source_ref — ссылка на источник latency_ms — время операции в мс

CognitiveEvent dataclass

CognitiveEvent(event_type: str = 'cognitive', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', goal: str = '', step: str = '', confidence: float = 1.0, decision: str = '', reasoning: List[str] = list(), input_refs: List[str] = list(), memory_refs: List[str] = list(), cpu_pct: float = 0.0, ram_mb: float = 0.0)

Bases: BaseEvent

Когнитивное событие — шаг мышления/планирования.

Поля

goal — текущая цель step — название шага confidence — уверенность в решении (0.0 — 1.0) decision — принятое решение reasoning — цепочка рассуждений input_refs — ссылки на входные данные memory_refs — ссылки на использованные факты памяти cpu_pct — загрузка CPU в момент события ram_mb — использование RAM в МБ

LearningEvent dataclass

LearningEvent(event_type: str = 'learning', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', trigger: str = 'online', affected_module: str = '', delta: float = 0.0, before: Any = None, after: Any = None, notes: str = '')

Bases: BaseEvent

Событие обучения — изменение весов/оценок/ассоциаций.

Поля

trigger — что вызвало обучение ('online', 'replay', 'self_supervised') affected_module — какой модуль обновился delta — величина изменения before — значение до after — значение после notes — комментарий

SystemEvent dataclass

SystemEvent(event_type: str = 'system', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', level: str = 'INFO', module: str = '', message: str = '', cpu_pct: float = 0.0, ram_mb: float = 0.0, ram_total_mb: float = 0.0, error: str = '')

Bases: BaseEvent

Системное событие — запуск, остановка, ресурсы, ошибки.

Поля

level — уровень ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL') module — модуль-источник message — сообщение cpu_pct — загрузка CPU ram_mb — использование RAM в МБ ram_total_mb — всего RAM в МБ error — текст ошибки (если есть)

EventFactory

Удобные фабричные методы для создания событий.


Утилиты

text_utils

text_utils

brain/core/text_utils.py — Канонические текстовые утилиты.

Critical DRY — единственный источник истины для:
  • detect_language(text) — определение языка по соотношению кириллицы/латиницы
  • parse_fact_pattern(text) — структурный парсинг факта из текста ("X это Y")
Эти функции ранее дублировались в
  • brain/perception/metadata_extractor.py
  • brain/encoders/text_encoder.py
  • brain/output/dialogue_responder.py
  • brain/output/response_validator.py
  • brain/memory/consolidation_engine.py

detect_language

detect_language(text: str) -> str

Определить язык текста по соотношению кириллических и латинских символов.

Пороги

60% кириллицы → 'ru' 60% латиницы → 'en' 10 букв, но ни одна группа не доминирует → 'mixed' нет букв или текст пустой → 'unknown'

Parameters:

Name Type Description Default
text str

входной текст

required

Returns:

Type Description
str

'ru' | 'en' | 'mixed' | 'unknown'

parse_fact_pattern

parse_fact_pattern(text: str) -> Optional[Tuple[str, str]]

Извлечь структурированный факт из текста.

Ищет паттерны
  • "X это Y"
  • "X — Y"
  • "X - Y"
  • "X: Y"
  • "X is Y"
  • "X are Y"
  • "X means Y"
Ограничения
  • concept: 2–50 символов
  • description: >= 5 символов
  • text: 5–500 символов

Функция отвечает только за структурный парсинг, без побочных эффектов и без нормализации сверх необходимого минимума.

Parameters:

Name Type Description Default
text str

входной текст

required

Returns:

Type Description
Optional[Tuple[str, str]]

(concept, description) или None если паттерн не найден

hash_utils

hash_utils

brain/core/hash_utils.py — Канонические хеш-утилиты.

Critical DRY — единственный источник истины для:
  • sha256_text(text, truncate) — SHA256 хеш строки
  • sha256_file(path, truncate) — SHA256 хеш файла
Эти функции ранее дублировались в
  • brain/encoders/text_encoder.py (_sha256 — полный hex)
  • brain/perception/input_router.py (_sha256 — truncate=16, _sha256_file)

sha256_text

sha256_text(text: str, truncate: Optional[int] = None) -> str

SHA256 хеш текстовой строки.

Parameters:

Name Type Description Default
text str

входной текст (кодируется в UTF-8)

required
truncate Optional[int]

если задано — обрезать hex-digest до N символов. None → полный hex (64 символа).

None

Returns:

Type Description
str

hex-digest строка (полная или обрезанная)

sha256_file

sha256_file(path: str, truncate: int = 16) -> str

SHA256 хеш файла (читает блоками по 64KB).

При ошибке чтения (OSError) — fallback на хеш от пути файла. Это сохраняет семантику дедупликации: даже если файл недоступен, один и тот же путь всегда даёт один и тот же хеш.

Parameters:

Name Type Description Default
path str

путь к файлу

required
truncate int

обрезать hex-digest до N символов (default: 16)

16

Returns:

Type Description
str

hex-digest строка (обрезанная до truncate символов)