brain.core — Ядро системы¶
Базовые компоненты: EventBus, Scheduler, Protocol-контракты, утилиты.
Contracts (Protocol DI)¶
Все зависимости инъектируются через typing.Protocol (@runtime_checkable).
contracts
¶
brain/core/contracts.py
Общие сквозные контракты (dataclass/enum/protocol) для взаимодействия слоёв. Цель: единый типовой "язык" между perception, encoders, cognition, output, logging.
Правила совместимости (A.2): - Не переименовывать поля без миграции (добавлять новые поля с default). - Все "действия" несут trace_id / session_id / cycle_id. - Сериализация: to_dict() / from_dict() на каждом контракте. - Enum-поля сериализуются как строки (.value).
ContractMixin
¶
Mixin для dataclass-контрактов. Предоставляет to_dict() и from_dict() с поддержкой Enum-полей.
from_dict
classmethod
¶
Создаёт экземпляр из dict.
Рекурсивно восстанавливает
- Enum-поля из строковых значений
- Вложенные dataclass-поля из dict (если тип — ContractMixin)
- List[dataclass] из списка dict
- Optional[dataclass/Enum] — разворачивает обёртку
Неизвестные ключи игнорируются (forward-compatibility).
Modality
¶
Bases: str, Enum
Поддерживаемые модальности входа/представления.
TaskStatus
¶
Bases: str, Enum
Статус выполнения задачи в scheduler/cognition.
ResourceState
dataclass
¶
ResourceState(cpu_pct: float = 0.0, ram_pct: float = 0.0, ram_used_mb: float = 0.0, ram_total_mb: float = 0.0, available_threads: int = 1, ring2_allowed: bool = True, soft_blocked: bool = False)
Bases: ContractMixin
Снимок ресурсного состояния системы. Используется для degradation policy и принятия решения о тяжёлых ветках.
Task
dataclass
¶
EncodedPercept
dataclass
¶
EncodedPercept(percept_id: str, modality: Modality, vector: List[float] = list(), text: str = '', quality: float = 0.0, source: str = '', language: str = '', message_type: str = 'unknown', encoder_model: str = '', vector_dim: int = 0, metadata: Dict[str, Any] = dict(), trace_id: str = '', session_id: str = '', cycle_id: str = '')
Bases: ContractMixin
Результат кодирования единичного перцепта одной модальности.
Top-level поля — только стабильные, используемые downstream-слоями. Всё остальное (keywords, encoding_time_ms, warnings, sentiment) → metadata.
FusedPercept
dataclass
¶
TraceRef
dataclass
¶
Bases: ContractMixin
Ссылка на источник в цепочке объяснимости. Например: memory_ref, hypothesis_ref, source_ref.
TraceStep
dataclass
¶
TraceChain
dataclass
¶
CognitiveResult
dataclass
¶
CognitiveResult(action: str, response: str, confidence: float, trace: TraceChain, goal: str = '', trace_id: str = '', session_id: str = '', cycle_id: str = '', contradictions: List[str] = list(), uncertainty: float = 0.0, salience: float = 0.0, memory_refs: List[TraceRef] = list(), source_refs: List[TraceRef] = list(), metadata: Dict[str, Any] = dict())
BrainOutput
dataclass
¶
TextEncoderProtocol
¶
Bases: Protocol
Формальный интерфейс TextEncoder для dependency injection.
Любой объект с методом encode(text, ...) → EncodedPercept удовлетворяет этому протоколу (structural subtyping).
encode
¶
encode(text: str, source: str = 'user_input', quality: float = 1.0, trace_id: str = '', session_id: str = '', cycle_id: str = '') -> Any
Кодировать текст → EncodedPercept.
MemoryManagerProtocol
¶
Bases: Protocol
Формальный интерфейс MemoryManager для dependency injection.
Любой объект с методами store(), retrieve(), store_fact(), save_all() удовлетворяет этому протоколу (structural subtyping).
EventBusProtocol
¶
Bases: Protocol
Формальный интерфейс EventBus для dependency injection.
Любой объект с методами publish(), subscribe(), unsubscribe() удовлетворяет этому протоколу.
Сигнатура publish() синхронизирована с реальным EventBus: - payload (не data) — единый naming convention - trace_id — для трассировки - возвращает int (количество вызванных handlers)
publish
¶
Опубликовать событие. Возвращает количество вызванных handlers.
Ключевые протоколы¶
| Протокол | Реализация | Используется в |
|---|---|---|
MemoryManagerProtocol |
MemoryManager |
CognitiveCore, CognitivePipeline |
TextEncoderProtocol |
TextEncoder |
CognitiveCore, CognitivePipeline |
EventBusProtocol |
EventBus, ThreadPoolEventBus |
CognitiveCore, CognitivePipeline |
ResourceMonitorProtocol |
ResourceMonitor |
CognitiveCore, CognitivePipeline |
EventBus¶
Синхронная шина событий с поддержкой снапшотов.
EventBus
¶
Синхронная typed pub/sub шина событий.
Использование
bus = EventBus()
def on_percept(event_type, payload, trace_id): print(f"[{trace_id}] {event_type}: {payload}")
bus.subscribe("percept", on_percept) bus.publish("percept", {"text": "hello"}, trace_id="t-001") bus.unsubscribe("percept", on_percept)
Wildcard
bus.subscribe("*", debug_handler) # получает ВСЕ события
subscribe
¶
Подписать handler на event_type. Повторная подписка одного и того же handler игнорируется.
unsubscribe
¶
Отписать handler от event_type. Если handler не был подписан — молча игнорируется.
publish
¶
Опубликовать событие.
Вызывает все handlers подписанные на event_type, а затем wildcard-handlers ("*"). Ошибка одного handler логируется и не прерывает остальных.
Snapshot pattern: копируем список handlers под lock, вызываем вне lock — предотвращает deadlock при re-entrant publish.
Returns:
| Type | Description |
|---|---|
int
|
Количество успешно вызванных handlers. |
ThreadPoolEventBus¶
Асинхронная шина событий на основе ThreadPoolExecutor (P3-9).
ThreadPoolEventBus
¶
Bases: EventBus
EventBus с диспетчеризацией handlers через ThreadPoolExecutor (P3-9).
Handlers вызываются в отдельных потоках пула — publish() не блокирует вызывающий поток. Ошибки handlers логируются асинхронно.
Дополнительные методы
publish_sync() — синхронная публикация (как в базовом EventBus) wait_all() — дождаться завершения всех pending futures shutdown() — завершить ThreadPoolExecutor
Использование
bus = ThreadPoolEventBus(max_workers=4) bus.subscribe("tick_end", on_tick) bus.publish("tick_end", {"cycle": 1}) # async bus.publish_sync("tick_end", {"cycle": 2}) # sync bus.wait_all() bus.shutdown(wait=True)
publish
¶
Опубликовать событие через ThreadPoolExecutor.
Handlers вызываются в отдельных потоках пула. Ошибки логируются в _call_handler (не прерывают остальных).
Returns:
| Type | Description |
|---|---|
int
|
Количество отправленных задач (futures submitted). |
shutdown
¶
Завершить ThreadPoolExecutor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
wait
|
bool
|
True = дождаться завершения всех running tasks. |
True
|
Scheduler¶
Планировщик задач с приоритетами для автономного цикла (P3-11).
Scheduler
¶
Scheduler(event_bus: EventBus, config: Optional[SchedulerConfig] = None, brain_logger: Optional[BrainLogger] = None)
Тик-планировщик автономного цикла мозга.
Использование (MVP): bus = EventBus() scheduler = Scheduler(bus)
def handle_think(task: Task) -> Any:
return {"result": "thought"}
scheduler.register_handler("think", handle_think)
scheduler.enqueue(Task(task_id="t1", task_type="think"), TaskPriority.NORMAL)
# Один тик вручную:
info = scheduler.tick()
# Или основной цикл (блокирующий):
scheduler.run(max_ticks=10)
SchedulerConfig
dataclass
¶
SchedulerConfig(tick_normal_ms: int = 100, tick_degraded_ms: int = 500, tick_critical_ms: int = 2000, tick_emergency_ms: int = 5000, max_queue_size: int = 256, max_tasks_per_tick: int = 1, cpu_degraded_threshold: float = 70.0, cpu_critical_threshold: float = 85.0, ram_degraded_gb: float = 22.0, ram_critical_gb: float = 28.0, ram_emergency_gb: float = 30.0, session_id: str = '')
Параметры планировщика.
TaskPriority
¶
Bases: IntEnum
Приоритет задачи в очереди планировщика. Меньшее значение = выше приоритет (heapq — min-heap).
SchedulerStats
dataclass
¶
SchedulerStats(ticks: int = 0, tasks_enqueued: int = 0, tasks_executed: int = 0, tasks_failed: int = 0, tasks_dropped: int = 0, idle_ticks: int = 0)
Накопленная статистика планировщика.
ResourceMonitor¶
Мониторинг ресурсов: CPU, RAM, политика ограничений.
ResourceMonitor
¶
Фоновый монитор CPU/RAM с graceful degradation.
Использование
bus = EventBus() monitor = ResourceMonitor(bus) monitor.start()
В основном цикле:¶
state = monitor.check() interval = scheduler.get_tick_interval(state)
monitor.stop()
Тестирование (без реального psutil): monitor.inject_state(ResourceState(cpu_pct=90.0)) # эмуляция high load
Events¶
Типы событий системы.
events
¶
events.py — Типизированные события для шины сообщений мозга.
Все модули общаются через события — никаких прямых зависимостей. Каждое событие сериализуется в JSON для логирования и трассировки.
BaseEvent
dataclass
¶
PerceptEvent
dataclass
¶
PerceptEvent(event_type: str = 'percept', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', source: str = '', modality: str = 'text', content: Any = None, quality: float = 1.0, language: str = 'unknown', metadata: Dict[str, Any] = dict())
Bases: BaseEvent
Событие восприятия — входящий сигнал из внешнего мира.
Поля
source — источник (путь к файлу, URL, "user_input", ...) modality — тип данных ('text', 'image', 'audio', 'video', 'mixed') content — содержимое (текст, путь к файлу, байты в base64) quality — оценка качества входных данных (0.0 — 1.0) language — язык ('ru', 'en', 'mixed', 'unknown') metadata — дополнительные метаданные (страница, временная метка, ...)
MemoryEvent
dataclass
¶
MemoryEvent(event_type: str = 'memory', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', operation: str = 'store', memory_type: str = 'working', key: str = '', value: Any = None, importance: float = 0.5, confidence: float = 1.0, source_ref: str = '', latency_ms: float = 0.0)
Bases: BaseEvent
Событие памяти — операция с любым видом памяти.
Поля
operation — тип операции ('store', 'retrieve', 'update', 'delete', 'consolidate', 'decay') memory_type — вид памяти ('working', 'episodic', 'semantic', 'procedural', 'source') key — ключ/концепт value — значение (факт, эпизод, ...) importance — важность записи (0.0 — 1.0) confidence — уверенность в факте (0.0 — 1.0) source_ref — ссылка на источник latency_ms — время операции в мс
CognitiveEvent
dataclass
¶
CognitiveEvent(event_type: str = 'cognitive', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', goal: str = '', step: str = '', confidence: float = 1.0, decision: str = '', reasoning: List[str] = list(), input_refs: List[str] = list(), memory_refs: List[str] = list(), cpu_pct: float = 0.0, ram_mb: float = 0.0)
Bases: BaseEvent
Когнитивное событие — шаг мышления/планирования.
Поля
goal — текущая цель step — название шага confidence — уверенность в решении (0.0 — 1.0) decision — принятое решение reasoning — цепочка рассуждений input_refs — ссылки на входные данные memory_refs — ссылки на использованные факты памяти cpu_pct — загрузка CPU в момент события ram_mb — использование RAM в МБ
LearningEvent
dataclass
¶
LearningEvent(event_type: str = 'learning', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', trigger: str = 'online', affected_module: str = '', delta: float = 0.0, before: Any = None, after: Any = None, notes: str = '')
Bases: BaseEvent
Событие обучения — изменение весов/оценок/ассоциаций.
Поля
trigger — что вызвало обучение ('online', 'replay', 'self_supervised') affected_module — какой модуль обновился delta — величина изменения before — значение до after — значение после notes — комментарий
SystemEvent
dataclass
¶
SystemEvent(event_type: str = 'system', ts: str = _now_iso(), trace_id: str = (lambda: _new_id('trace_'))(), session_id: str = '', cycle_id: str = '', level: str = 'INFO', module: str = '', message: str = '', cpu_pct: float = 0.0, ram_mb: float = 0.0, ram_total_mb: float = 0.0, error: str = '')
Bases: BaseEvent
Системное событие — запуск, остановка, ресурсы, ошибки.
Поля
level — уровень ('DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL') module — модуль-источник message — сообщение cpu_pct — загрузка CPU ram_mb — использование RAM в МБ ram_total_mb — всего RAM в МБ error — текст ошибки (если есть)
EventFactory
¶
Удобные фабричные методы для создания событий.
Утилиты¶
text_utils¶
text_utils
¶
brain/core/text_utils.py — Канонические текстовые утилиты.
Critical DRY — единственный источник истины для:
- detect_language(text) — определение языка по соотношению кириллицы/латиницы
- parse_fact_pattern(text) — структурный парсинг факта из текста ("X это Y")
Эти функции ранее дублировались в
- brain/perception/metadata_extractor.py
- brain/encoders/text_encoder.py
- brain/output/dialogue_responder.py
- brain/output/response_validator.py
- brain/memory/consolidation_engine.py
detect_language
¶
Определить язык текста по соотношению кириллических и латинских символов.
Пороги
60% кириллицы → 'ru' 60% латиницы → 'en' 10 букв, но ни одна группа не доминирует → 'mixed' нет букв или текст пустой → 'unknown'
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
входной текст |
required |
Returns:
| Type | Description |
|---|---|
str
|
'ru' | 'en' | 'mixed' | 'unknown' |
parse_fact_pattern
¶
Извлечь структурированный факт из текста.
Ищет паттерны
- "X это Y"
- "X — Y"
- "X - Y"
- "X: Y"
- "X is Y"
- "X are Y"
- "X means Y"
Ограничения
- concept: 2–50 символов
- description: >= 5 символов
- text: 5–500 символов
Функция отвечает только за структурный парсинг, без побочных эффектов и без нормализации сверх необходимого минимума.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
входной текст |
required |
Returns:
| Type | Description |
|---|---|
Optional[Tuple[str, str]]
|
(concept, description) или None если паттерн не найден |
hash_utils¶
hash_utils
¶
brain/core/hash_utils.py — Канонические хеш-утилиты.
Critical DRY — единственный источник истины для:
- sha256_text(text, truncate) — SHA256 хеш строки
- sha256_file(path, truncate) — SHA256 хеш файла
Эти функции ранее дублировались в
- brain/encoders/text_encoder.py (_sha256 — полный hex)
- brain/perception/input_router.py (_sha256 — truncate=16, _sha256_file)
sha256_text
¶
SHA256 хеш текстовой строки.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
входной текст (кодируется в UTF-8) |
required |
truncate
|
Optional[int]
|
если задано — обрезать hex-digest до N символов. None → полный hex (64 символа). |
None
|
Returns:
| Type | Description |
|---|---|
str
|
hex-digest строка (полная или обрезанная) |
sha256_file
¶
SHA256 хеш файла (читает блоками по 64KB).
При ошибке чтения (OSError) — fallback на хеш от пути файла. Это сохраняет семантику дедупликации: даже если файл недоступен, один и тот же путь всегда даёт один и тот же хеш.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
путь к файлу |
required |
truncate
|
int
|
обрезать hex-digest до N символов (default: 16) |
16
|
Returns:
| Type | Description |
|---|---|
str
|
hex-digest строка (обрезанная до truncate символов) |