diff --git a/alembic/versions/039_app_settings_id_to_uuid.py b/alembic/versions/039_app_settings_id_to_uuid.py new file mode 100644 index 00000000..5c8be9dd --- /dev/null +++ b/alembic/versions/039_app_settings_id_to_uuid.py @@ -0,0 +1,47 @@ +"""Convert app_settings.id from VARCHAR(36) to native UUID. + +Migration 026 created the column as String(36), but the ORM UUIDMixin +declares it as UUID(as_uuid=False). The type mismatch causes asyncpg +to emit ``WHERE app_settings.id = $1::UUID`` which Postgres rejects +with "operator does not exist: character varying = uuid". + +Revision ID: 039_app_settings_id_to_uuid +Revises: 038_fix_proposalstatus_enum_case +Create Date: 2026-03-14 +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +if TYPE_CHECKING: + from collections.abc import Sequence + +revision: str = "039_app_settings_id_to_uuid" +down_revision: str | None = "038_fix_proposalstatus_enum_case" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.alter_column( + "app_settings", + "id", + type_=postgresql.UUID(as_uuid=False), + existing_type=sa.String(36), + postgresql_using="id::uuid", + ) + + +def downgrade() -> None: + op.alter_column( + "app_settings", + "id", + type_=sa.String(36), + existing_type=postgresql.UUID(as_uuid=False), + postgresql_using="id::text", + ) diff --git a/alembic/versions/040_normalize_insightstatus_data.py b/alembic/versions/040_normalize_insightstatus_data.py new file mode 100644 index 00000000..0d121706 --- /dev/null +++ b/alembic/versions/040_normalize_insightstatus_data.py @@ -0,0 +1,109 @@ +"""Normalize insight enum values to match ORM entity definitions. + +The insightstatus PG enum contains UPPERCASE labels (PENDING, REVIEWED, +ACTIONED, DISMISSED) and some lowercase duplicates (pending, actioned), +but the ORM expects the .value side of the Python enum: generated, +reviewed, acted_upon, dismissed. + +Similarly, insighttype has UPPERCASE labels (ENERGY_OPTIMIZATION, etc.) +but the ORM expects lowercase .value strings (energy_optimization, etc.). + +This migration: + 1. Adds missing canonical enum labels + 2. Converts all existing row data to canonical lowercase values + 3. Leaves orphan enum labels in place (PG can't drop enum values) + +Revision ID: 040_normalize_insightstatus_data +Revises: 039_app_settings_id_to_uuid +Create Date: 2026-03-15 +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from alembic import op + +if TYPE_CHECKING: + from collections.abc import Sequence + +revision: str = "040_normalize_insightstatus_data" +down_revision: str | None = "039_app_settings_id_to_uuid" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + +# ── insightstatus ────────────────────────────────────────────────────── +# ORM values: generated, reviewed, acted_upon, dismissed +_STATUS_LABELS_TO_ADD = ["generated", "reviewed", "acted_upon", "dismissed"] +_STATUS_DATA_MAP = { + "PENDING": "generated", + "pending": "generated", + "REVIEWED": "reviewed", + "ACTIONED": "acted_upon", + "actioned": "acted_upon", + "DISMISSED": "dismissed", +} + +# ── insighttype ──────────────────────────────────────────────────────── +# ORM values (from InsightType.value): energy_optimization, anomaly, +# pattern, recommendation, maintenance_prediction, automation_gap, etc. +_TYPE_LABELS_TO_ADD = [ + "energy_optimization", + "anomaly", + "pattern", + "recommendation", + "maintenance_prediction", +] +_TYPE_DATA_MAP = { + "ENERGY_OPTIMIZATION": "energy_optimization", + "ANOMALY_DETECTION": "anomaly", + "USAGE_PATTERN": "pattern", + "COST_SAVING": "recommendation", + "MAINTENANCE_PREDICTION": "maintenance_prediction", +} + + +def upgrade() -> None: + # 1. Add missing canonical labels to the PG enums + for label in _STATUS_LABELS_TO_ADD: + op.execute(f"ALTER TYPE insightstatus ADD VALUE IF NOT EXISTS '{label}'") + + for label in _TYPE_LABELS_TO_ADD: + op.execute(f"ALTER TYPE insighttype ADD VALUE IF NOT EXISTS '{label}'") + + # ADD VALUE must commit before the values can be used in DML, + # so we need a separate transaction for the UPDATEs. + # Alembic runs each migration in its own transaction by default. + # We force a commit here, then run the UPDATEs. + op.execute("COMMIT") + + # 2. Convert stale data rows to canonical values + for old, new in _STATUS_DATA_MAP.items(): + op.execute(f"UPDATE insights SET status = '{new}' WHERE status = '{old}'") + + for old, new in _TYPE_DATA_MAP.items(): + op.execute(f"UPDATE insights SET type = '{new}' WHERE type = '{old}'") + + # Re-open a transaction for Alembic's version-table update + op.execute("BEGIN") + + +def downgrade() -> None: + # Best-effort reverse: map canonical values back to UPPERCASE + _STATUS_REVERSE = { + "generated": "PENDING", + "reviewed": "REVIEWED", + "acted_upon": "ACTIONED", + "dismissed": "DISMISSED", + } + _TYPE_REVERSE = { + "energy_optimization": "ENERGY_OPTIMIZATION", + "anomaly": "ANOMALY_DETECTION", + "pattern": "USAGE_PATTERN", + "recommendation": "COST_SAVING", + "maintenance_prediction": "MAINTENANCE_PREDICTION", + } + for old, new in _STATUS_REVERSE.items(): + op.execute(f"UPDATE insights SET status = '{new}' WHERE status = '{old}'") + for old, new in _TYPE_REVERSE.items(): + op.execute(f"UPDATE insights SET type = '{new}' WHERE type = '{old}'") diff --git a/alembic/versions/041_schema_alignment.py b/alembic/versions/041_schema_alignment.py new file mode 100644 index 00000000..07f8d324 --- /dev/null +++ b/alembic/versions/041_schema_alignment.py @@ -0,0 +1,132 @@ +"""Align database column types with ORM entity definitions. + +Fixes schema drift accumulated across early migrations where column +types diverged from the ORM: + +Part A — Convert VARCHAR(36) primary keys to native UUID: + - insights.id + - workflow_definition.id + - tool_group.id + +Part B — Fix insights column types: + - impact: varchar(50) -> insightimpact enum + - evidence: json -> jsonb + - entities: json -> jsonb (HA entity ID strings, not UUIDs) + - script_output: json -> jsonb + +Part C — Fix conversation.status: + - varchar(20) -> conversationstatus enum + +Revision ID: 041_schema_alignment +Revises: 040_normalize_insightstatus_data +Create Date: 2026-03-15 +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +if TYPE_CHECKING: + from collections.abc import Sequence + +revision: str = "041_schema_alignment" +down_revision: str | None = "040_normalize_insightstatus_data" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ── Part A: VARCHAR(36) PKs → UUID ──────────────────────────────── + for table in ("insights", "workflow_definition", "tool_group"): + op.alter_column( + table, + "id", + type_=postgresql.UUID(as_uuid=False), + existing_type=sa.String(36), + postgresql_using="id::uuid", + ) + + # ── Part B: insights column types ───────────────────────────────── + # impact: varchar(50) → insightimpact enum + op.execute( + "ALTER TABLE insights ALTER COLUMN impact TYPE insightimpact USING impact::insightimpact" + ) + + # evidence: json → jsonb + op.alter_column( + "insights", + "evidence", + type_=postgresql.JSONB(), + existing_type=sa.JSON(), + postgresql_using="evidence::jsonb", + ) + + # entities: json → jsonb (stores HA entity ID strings, not UUIDs) + op.alter_column( + "insights", + "entities", + type_=postgresql.JSONB(), + existing_type=sa.JSON(), + postgresql_using="entities::jsonb", + server_default=sa.text("'[]'::jsonb"), + ) + + # script_output: json → jsonb + op.alter_column( + "insights", + "script_output", + type_=postgresql.JSONB(), + existing_type=sa.JSON(), + postgresql_using="script_output::jsonb", + ) + + # ── Part C: conversation.status → conversationstatus enum ───────── + op.execute( + "ALTER TABLE conversation " + "ALTER COLUMN status TYPE conversationstatus " + "USING status::conversationstatus" + ) + + +def downgrade() -> None: + # ── Part C reverse ──────────────────────────────────────────────── + op.execute("ALTER TABLE conversation ALTER COLUMN status TYPE varchar(20) USING status::text") + + # ── Part B reverse ──────────────────────────────────────────────── + op.alter_column( + "insights", + "script_output", + type_=sa.JSON(), + existing_type=postgresql.JSONB(), + ) + + op.alter_column( + "insights", + "entities", + type_=sa.JSON(), + existing_type=postgresql.JSONB(), + server_default=None, + ) + + op.alter_column( + "insights", + "evidence", + type_=sa.JSON(), + existing_type=postgresql.JSONB(), + ) + + op.execute("ALTER TABLE insights ALTER COLUMN impact TYPE varchar(50) USING impact::text") + + # ── Part A reverse ──────────────────────────────────────────────── + for table in ("insights", "workflow_definition", "tool_group"): + op.alter_column( + table, + "id", + type_=sa.String(36), + existing_type=postgresql.UUID(as_uuid=False), + postgresql_using="id::text", + ) diff --git a/specs/001-project-aether/features/41-code-audit/plan.md b/specs/001-project-aether/features/41-code-audit/plan.md new file mode 100644 index 00000000..3e0a8250 --- /dev/null +++ b/specs/001-project-aether/features/41-code-audit/plan.md @@ -0,0 +1,48 @@ +# Feature 41: Code Audit — Implementation Plan + +## Phase 1: Quick Wins (P0 — this session) + +### T1. Fix mutable default in proposals route +- File: `src/api/routes/proposals.py:444` +- Change: Replace `body: dict[str, Any] = {}` with `body: dict[str, Any] = Body(default={})` + +### T2. Narrow error handling in tool functions +- Files: `src/tools/agent_tools.py`, `src/tools/diagnostic_tools.py` +- Change: Replace `except Exception as e: return f"..."` with specific exceptions + and structured error responses that preserve diagnostic information. + +### T3. Fix N+1 query in list_proposals +- File: `src/api/routes/proposals.py:116-120` +- Change: Replace per-status loop with single `repo.list_all()` query + +### T4. Extract model_context boilerplate +- File: `src/tools/agent_tools.py` +- Change: Create reusable `_run_with_model_context()` helper to eliminate 3× duplication + +## Phase 2: Performance (P1 — this session) + +### T5. Use fast model for orchestrator classification +- File: `src/agents/orchestrator.py` +- Change: Override model selection in `_get_classification_llm()` to use a fast/cheap + model regardless of user-selected model + +### T6. Concurrent automation config fetches in discovery sync +- File: `src/dal/sync.py:_sync_automation_entities()` +- Change: Use `asyncio.gather()` with semaphore for concurrent config fetches + +### T7. Fix orchestrator session management +- File: `src/agents/orchestrator.py:173-189` +- Change: Use `async with get_session()` context manager + +## Phase 3: Modularity (P2 — future session) + +### T8. Split proposals.py into subpackage +### T9. Split handlers.py streaming logic +### T10. Add public method for HA entity state lookup +### T11. Split dal/agents.py into per-repository files + +## Phase 4: Polish (P3 — future session) + +### T12. Remove redundant logging imports +### T13. Split checkpoints.py model/saver +### T14. Extract scheduler job definitions diff --git a/specs/001-project-aether/features/41-code-audit/spec.md b/specs/001-project-aether/features/41-code-audit/spec.md new file mode 100644 index 00000000..83c9af8f --- /dev/null +++ b/specs/001-project-aether/features/41-code-audit/spec.md @@ -0,0 +1,183 @@ +# Feature 41: Code Audit & Health Check + +## Overview + +Comprehensive code quality audit covering code smells, error handling, +modularity, and agent response time optimization across the Aether codebase +(~28,500 lines across 270+ Python files in `src/`). + +## Audit Findings + +### 1. Files That Need Modularization (Too Long) + +| File | Lines | Issue | Action | +|------|-------|-------|--------| +| `src/api/routes/proposals.py` | 1,287 | Single file with 15+ route handlers AND 3 deployment strategies AND YAML generation | Split into `proposals/routes.py`, `proposals/deploy.py`, `proposals/yaml.py` | +| `src/tools/agent_tools.py` | 781 | 6 tool functions each with identical model_context boilerplate (lines 67-85 pattern repeated 3×) | Extract `_with_model_context()` helper; split formatters to `agent_tools_format.py` | +| `src/dal/agents.py` | 727 | 3 large repository classes in one file | Split into `agents/agent_repo.py`, `agents/config_version_repo.py`, `agents/prompt_version_repo.py` | +| `src/api/routes/openai_compat/handlers.py` | 682 | Single streaming handler is 490 lines with deeply nested event processing | Extract `_handle_distributed_stream()` and `_handle_monolith_stream()` into separate functions or a handler class | +| `src/dal/sync.py` | 624 | Discovery sync service with 4 sync methods + delta sync + convenience functions | Already well-structured; extract `_sync_automation_entities` to separate file | +| `src/storage/checkpoints.py` | 594 | Checkpoint model + saver + write model + write saver in one file | Split model from saver: `checkpoint_models.py` / `checkpoint_saver.py` | +| `src/scheduler/service.py` | 595 | Scheduler with many job definitions inline | Extract job definitions to `scheduler/jobs.py` | + +### 2. Error Handling / Swallowing Issues + +#### Critical: Bare `except Exception` That Swallows Errors + +**33 instances** of `except Exception:` across `src/` — many silently swallow errors: + +- `src/api/routes/proposals.py` (5×): Lines 87, 768, 821, 931, 1144 — HA client failures logged at debug/warning but user gets no feedback +- `src/tools/agent_tools.py` (6×): Every tool returns `f"I wasn't able to..."` on ANY exception — hides timeouts, auth failures, DB errors behind a generic string +- `src/tools/diagnostic_tools.py` (5×): Same pattern — returns error string that swallows the root cause +- `src/api/routes/openai_compat/handlers.py` (3×): Line 215 swallows stream timeout resolution; line 373 swallows distributed streaming failure; line 680 catches all streaming errors +- `src/hitl/insight_notifier.py` (2×): Silent swallow of notification failures +- `src/ha/event_handler.py` (2×): Event processing failures silently dropped + +**Fix priority**: HIGH — These mask real failures (auth token expired, DB connection lost, HA unreachable) behind generic messages. + +#### Mutable Default Argument + +```python +# src/api/routes/proposals.py:444 +body: dict[str, Any] = {}, # noqa: B006 +``` + +The `# noqa` suppresses a legitimate Ruff B006 (mutable default). Should use `Body(default={})` from FastAPI. + +### 3. Code Smells + +#### 3a. Duplicated Model Context Boilerplate + +`src/tools/agent_tools.py` repeats this 15-line block 3 times (lines 68-85, 442-459, 598-615): + +```python +from src.agents.model_context import get_model_context, model_context +ctx = get_model_context() +parent_span_id = None +try: + from src.tracing import get_active_span + active_span = get_active_span() + if active_span and hasattr(active_span, "span_id"): + parent_span_id = active_span.span_id +except (AttributeError, LookupError): + logger.debug(...) +with model_context(model_name=..., temperature=..., parent_span_id=...): +``` + +Should be a single `@with_inherited_model_context` decorator. + +#### 3b. Redundant Import of `logging` Inside Methods + +`src/sandbox/runner.py:348` and `src/dal/sync.py:183` both do: +```python +import logging +logging.getLogger(__name__).warning(...) +``` +despite `logger = logging.getLogger(__name__)` already being defined at module level. + +#### 3c. Sequential I/O in Discovery Sync + +`src/dal/sync.py:_sync_automation_entities()` fetches automation configs sequentially: +```python +for entity in automations: + config = await self.ha.get_automation_config(ha_automation_id) +``` +With 50+ automations, this is 50 sequential HTTP calls. Should use `asyncio.gather()` with concurrency limiting. + +#### 3d. Sequential I/O in `list_proposals` (N+1 Query Pattern) + +`src/api/routes/proposals.py:116-120`: +```python +for s in ProposalStatus: + proposals.extend(await repo.list_by_status(s, limit=limit)) +``` +Makes N separate DB queries (one per status). Should be a single `repo.list_all()` query. + +#### 3e. Private API Usage + +`src/api/routes/proposals.py:83` and `:911`: +```python +state_data = await ha._request("GET", f"/api/states/{entity_id}") +``` +Routes calling `_request` (private method) directly instead of using a public API method. + +### 4. Agent Response Time Optimization + +#### 4a. LLM Factory — Good: Already Cached + +`src/llm/factory.py` caches instances per `(provider, model, temperature)`. No action needed. + +#### 4b. Orchestrator — Extra LLM Call on Every Request + +`src/api/routes/openai_compat/handlers.py:387-391`: +```python +orchestrator = OrchestratorAgent(model_name=request.model) +classification = await orchestrator.classify_intent(...) +plan = await orchestrator.plan_response(...) +``` +When `needs_orchestrator=True`, this makes 1-2 extra LLM calls (classify + optionally generate clarification options) before the actual agent even starts. The orchestrator uses the **same model** as the target agent for classification — expensive for frontier models. + +**Fix**: Use a fast/cheap model for classification regardless of request model. Cache agent routing for identical recent messages. + +#### 4c. Orchestrator DB Query Without Session Context Manager + +`src/agents/orchestrator.py:173-189`: +```python +factory = get_session_factory() +session = factory() +try: + repo = AgentRepository(session) + agents = await repo.list_all() +finally: + await session.close() +``` +Manually managing session instead of using `async with get_session()`. Also queries DB on every classify call — should be cached with TTL. + +#### 4d. Resilient LLM Retry Delays + +`src/llm/circuit_breaker.py` defines `RETRY_DELAYS` — need to verify these are reasonable for user-facing latency (imported but not shown in the read). The current retry mechanism adds latency on transient failures. + +#### 4e. Discovery Sync — Sequential Automation Config Fetches + +As noted in 3c, discovery sync fetches automation configs one at a time. With concurrent fetching, sync time could drop from O(n × latency) to O(latency + n/concurrency × latency). + +### 5. Security Issues + +#### 5a. Mutable Default in Route Handler + +Already noted — `body: dict[str, Any] = {}` in proposals YAML update. + +#### 5b. No Timeout on `connect()` in BaseHAClient + +`src/ha/base.py:169` uses `timeout=5` for connect check, which is good. But `_request` inherits from `self.config.timeout` (default 30s) — appropriate. + +### 6. Positive Findings (No Action Needed) + +- **No f-string logging**: Zero instances found. All logging uses lazy `%s` formatting. +- **No TODO/FIXME/HACK comments**: Clean codebase. +- **Good exception hierarchy**: `AetherError` base with proper subclasses. +- **Correlation IDs**: Consistently propagated via middleware. +- **Security headers**: Comprehensive middleware coverage. +- **Connection pooling**: `BaseHAClient._get_http_client()` properly pools connections. +- **LLM cache**: Factory caches instances per config tuple. +- **Rate limiting**: Applied to all mutation endpoints. +- **Batch operations**: `BaseRepository.upsert_many()` is well-implemented. +- **`asyncio.sleep` usage**: Only in legitimate places (retry backoff, event batching). + +## Priority Matrix + +| Priority | Finding | Impact | Effort | +|----------|---------|--------|--------| +| P0 | Mutable default in proposals route | Security/correctness | 5 min | +| P0 | Error swallowing in tools (agent_tools, diagnostic_tools) | Reliability | 1 hr | +| P1 | Orchestrator uses expensive model for classification | Response time | 30 min | +| P1 | Sequential automation config fetches in sync | Sync performance | 30 min | +| P1 | N+1 query in list_proposals | API latency | 15 min | +| P1 | Duplicated model_context boilerplate | Maintainability | 30 min | +| P2 | Split proposals.py (1287 lines) | Maintainability | 1 hr | +| P2 | Split handlers.py streaming logic | Maintainability | 1 hr | +| P2 | Private _request usage in proposals routes | API hygiene | 15 min | +| P2 | Orchestrator session management | Correctness | 15 min | +| P3 | Split dal/agents.py | Maintainability | 30 min | +| P3 | Split checkpoints.py | Maintainability | 30 min | +| P3 | Redundant logging imports | Clean code | 5 min | diff --git a/specs/001-project-aether/features/42-agent-memory/spec.md b/specs/001-project-aether/features/42-agent-memory/spec.md new file mode 100644 index 00000000..7a57b010 --- /dev/null +++ b/specs/001-project-aether/features/42-agent-memory/spec.md @@ -0,0 +1,312 @@ +# Feature: Agent Memory Layer + +**Status**: Draft +**Priority**: P1 +**Depends on**: 23-agent-configuration, 32-prompt-registry +**Created**: 2026-03-12 + +## Goal + +Give Aether persistent, cross-session memory so the agent can recall user +preferences, past decisions, device usage patterns, and corrections — and +reason over the relationships between them — without relying solely on the +20-message sliding window within a single conversation. + +## Problem Statement + +Today the Architect agent is stateless across conversations. Every new chat +session starts from zero — the agent does not know that the user prefers lights +at 60%, rejected a particular automation last week, or has solar panels. Context +is limited to: + +1. **Sliding window** — last 20 messages within the current conversation. +2. **Conversation.context** — JSONB blob, not searchable, not shared across + sessions. +3. **LangGraph checkpoints** — workflow state, not semantic knowledge. + +Additionally, the domain has rich relationships that are currently invisible to +the agent: devices belong to areas, automations reference entities, insights +span multiple devices, and proposals link conversations to automations. Many of +these are soft references (JSONB arrays, string IDs) that cannot be traversed +via SQL joins. When a user asks "what automations affect the bedroom?" the agent +has no way to answer without re-discovering everything from scratch. + +## User Experience + +### Scenario 1 — Preference Recall + +1. User says: "I always want the hallway lights at 40% after sunset." +2. Architect stores a `preference` memory linked to the hallway light entities. +3. Two weeks later, user says: "Create an automation for the hallway lights." +4. Architect retrieves the stored preference **and** traverses the graph to find + which entities are hallway lights, proposing an automation with 40% brightness + after sunset — without the user repeating anything. +5. User says: "Actually, make it 30% now." +6. Architect updates the existing memory (not a duplicate). + +### Scenario 2 — Cross-Domain Reasoning + +1. User asks: "What did we decide about the bedroom?" +2. Agent queries the memory graph for all nodes linked to the "Bedroom" area: + preferences about bedroom lights, a rejected automation proposal for the + bedroom thermostat, an insight about bedroom humidity. +3. Agent synthesizes a coherent answer spanning devices, automations, and + past analyses — something impossible with flat memory alone. + +### Scenario 3 — Memory Management + +1. User visits Settings → Memories and sees a list of what Aether remembers. +2. Each memory shows its type, content, linked entities, source conversation, + and timestamps. +3. User can view, correct, or delete any memory. + +## Core Capabilities + +### Explicit Memory Storage via Agent Tool + +Memories are created through a dedicated `store_memory` tool available to the +Architect (and other agents). The agent decides when something is worth +remembering — there is no background extraction pipeline. + +This keeps memory creation observable, traceable, and deterministic: +- Every memory write is a tool call visible in the MLflow trace. +- The user sees "Aether remembered: hallway lights → 40% after sunset" in the + chat activity panel. +- No hidden LLM calls deciding what to remember behind the scenes. + +### Hybrid Retrieval: Vector Search + Graph Traversal + +Memory retrieval combines two strategies: + +1. **Semantic search** — embed the user's query and find the most relevant + memory nodes via pgvector cosine similarity. +2. **Graph expansion** — from the matched nodes, traverse edges to pull in + related context (the area a device is in, other preferences for that area, + past decisions about linked automations). + +Both run on the same PostgreSQL instance. Semantic search finds the entry +point; graph traversal enriches it with relational context. + +### Memory Types + +| Type | Description | Example | +|------|-------------|---------| +| `preference` | User preference or habitual request | "Lights at 40% after sunset" | +| `decision` | A choice the user made (approved/rejected) | "Rejected motion-sensor automation for bathroom" | +| `correction` | User corrected the agent's assumption | "House has 3 bedrooms, not 2" | +| `device_pattern` | Observed device usage or configuration | "Solar panels on south roof, 6kW system" | +| `instruction` | Standing instruction for agent behaviour | "Always ask before changing thermostat" | + +### Graph Relationships + +Memory nodes connect to each other and to domain entities via typed edges: + +| Edge Type | From | To | Example | +|-----------|------|----|---------| +| `about_entity` | Memory | HAEntity | Preference → `light.hallway` | +| `about_area` | Memory | Area | Preference → "Hallway" | +| `about_automation` | Memory | HAAutomation | Decision → rejected automation | +| `supersedes` | Memory | Memory | Updated preference → old preference | +| `related_to` | Memory | Memory | "30% lights" related to "energy saving mode" | +| `derived_from` | Memory | Insight | Device pattern → energy analysis insight | +| `from_conversation` | Memory | Conversation | Provenance link | + +Edges are lightweight rows in a `memory_edges` table — not a separate graph +database. Traversal uses recursive CTEs, bounded by depth (default 2 hops) +to keep queries fast. + +### Deterministic Conflict Resolution + +When storing a memory that overlaps with an existing one (same user, same +type, high vector similarity), the agent explicitly updates rather than +duplicates. The update is a tool call, not a background LLM decision: + +1. Agent calls `store_memory` with content and entity links. +2. System finds existing memory above similarity threshold. +3. System returns the match to the agent with a prompt to confirm update. +4. Agent calls `update_memory` with the revised content. +5. Old content is preserved in a `previous_content` field for audit. +6. A `supersedes` edge is created from the new version to the old. + +### User Memory Management + +Users can view and manage their memories through the UI and API: + +- **View**: List all memories with type, content, linked entities, source + conversation, and timestamps. +- **Edit**: Correct a memory's content (triggers re-embedding). +- **Delete**: Remove a memory permanently. +- **Search**: Semantic search across memories ("what does Aether know about + my lights?"). + +## Components + +### Database Entities + +**`MemoryNode`** — core memory table with embeddings + +| Column | Type | Description | +|--------|------|-------------| +| `id` | UUID | PK | +| `user_id` | String(100) | User scope, indexed | +| `agent_id` | UUID | Agent that created this memory, FK to `agent` | +| `memory_type` | Enum | `preference`, `decision`, `correction`, `device_pattern`, `instruction` | +| `content` | Text | Human-readable memory content | +| `embedding` | Vector(dim) | pgvector embedding for semantic search | +| `source_conversation_id` | UUID | FK to `conversation` (nullable) | +| `source_message_id` | UUID | FK to `message` (nullable) | +| `previous_content` | Text | Prior content before last update (audit trail) | +| `metadata` | JSONB | Structured data (extracted entities, confidence, etc.) | +| `is_active` | Boolean | Soft-delete flag, default true | +| `created_at` | Timestamp | Creation time | +| `updated_at` | Timestamp | Last update time | + +**`MemoryEdge`** — typed relationships between memory nodes and domain entities + +| Column | Type | Description | +|--------|------|-------------| +| `id` | UUID | PK | +| `edge_type` | String | Relationship type (see table above) | +| `from_node_id` | UUID | FK to `memory_nodes`, indexed | +| `to_node_id` | UUID | FK to `memory_nodes` (nullable, for memory↔memory edges) | +| `to_entity_id` | UUID | Target domain entity ID (nullable, for memory↔domain edges) | +| `to_entity_type` | String | Target entity table name (e.g. `ha_entity`, `area`, `conversation`) | +| `properties` | JSONB | Edge metadata (e.g. relationship strength, context) | +| `created_at` | Timestamp | Creation time | + +Constraint: exactly one of `to_node_id` or `to_entity_id` must be non-null +(CHECK constraint). This allows edges to point either to other memory nodes or +to existing domain entities without requiring FKs to every table. + +Unique constraint on `(from_node_id, to_node_id, edge_type)` and +`(from_node_id, to_entity_id, to_entity_type, edge_type)` to prevent +duplicate edges. + +### Data Access Layer + +**`MemoryRepository`** in `src/dal/memory.py` + +- `store(user_id, content, memory_type, embedding, edges, ...)` — insert node + edges +- `search(user_id, query_embedding, top_k, threshold)` — pgvector cosine similarity +- `search_with_graph(user_id, query_embedding, top_k, threshold, depth)` — vector + search + recursive CTE expansion of edges up to `depth` hops +- `find_similar(user_id, embedding, threshold)` — find potential duplicates +- `update(memory_id, content, new_embedding)` — update with audit trail +- `delete(memory_id)` — soft-delete node (edges preserved for audit) +- `hard_delete(memory_id)` — permanent removal of node + edges +- `list_by_user(user_id, memory_type?, limit, offset)` — paginated listing +- `get_related(node_id, edge_types?, depth)` — graph traversal from a node +- `get_by_entity(entity_id, entity_type)` — all memories linked to a domain entity + +### Agent Tools + +| Tool | Agent | Description | +|------|-------|-------------| +| `store_memory` | Architect, Analysts | Store a memory with optional entity links | +| `update_memory` | Architect, Analysts | Update an existing memory (with audit trail + supersedes edge) | +| `recall_memories` | Architect, Analysts | Hybrid retrieval: vector search + graph expansion | +| `list_memories` | Architect | List user's memories (for chat-based management) | +| `delete_memory` | Architect | Soft-delete a memory | + +The `store_memory` tool accepts optional `entity_ids` (HA entity IDs) and +`area_ids` which are automatically resolved to `about_entity` and `about_area` +edges. The agent provides these when the memory is about specific devices or +rooms. + +### Embedding Service + +**`EmbeddingService`** in `src/services/embedding.py` + +- Wraps the configured embedding model (e.g. `text-embedding-3-small`). +- Configurable via `AppSettings` (model name, dimension, batch size). +- Async interface with retry logic. +- Shared across memory storage, retrieval, and re-embedding on edit. + +### Backend API + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `GET /api/v1/memories` | GET | List memories for authenticated user | +| `GET /api/v1/memories/{id}` | GET | Get single memory with edges | +| `PUT /api/v1/memories/{id}` | PUT | Update memory content | +| `DELETE /api/v1/memories/{id}` | DELETE | Delete memory | +| `POST /api/v1/memories/search` | POST | Semantic search across memories | +| `GET /api/v1/memories/entity/{entity_id}` | GET | Memories linked to a domain entity | + +### UI + +- **Settings → Memories** page: list, search, edit, delete memories. Each + memory shows linked entities as clickable chips. +- **Chat activity panel**: "Aether remembered: ..." when a memory is stored. +- **Memory indicator**: subtle badge when the agent uses recalled memories in + a response. +- **Entity detail**: "Memories about this entity" section on device/area pages + (future enhancement). + +### Context Injection + +The memory retrieval is wired into `_build_messages` on the Architect (and +other agents). Before generating a response: + +1. Embed the latest user message. +2. Call `MemoryRepository.search_with_graph()` — vector search for top-k + memories, then expand 1-2 hops via graph edges to pull in related context. +3. Format retrieved memories as a `SystemMessage` block injected after the + main system prompt and before conversation history. Includes relationship + context (e.g. "This preference is about light.hallway in the Hallway area"). +4. Track which memories were used in the response metadata for observability. + +## Constitution Check + +- **Safety First**: Memory storage is an explicit agent tool call — no + autonomous background extraction. Users can view, edit, and delete all + memories. No memory influences mutating actions without HITL approval. +- **Isolation**: No new external services. pgvector and graph tables run + within existing PostgreSQL. Embedding calls go through the same LLM + provider pipeline. +- **Observability**: Every memory write/read is a traced tool call in MLflow. + Retrieved memories and graph expansions are logged in response metadata. +- **State**: Memories and edges are durable in PostgreSQL. No in-memory-only + state. Aligns with "Postgres for durable checkpointing." +- **Security**: Memories are scoped per `user_id` — no cross-user leakage. + Graph traversal is user-scoped (edges can only reach the user's own memory + nodes; domain entity edges are read-only references). API endpoints require + authentication. Memory content is validated via Pydantic. + +## Acceptance Criteria + +- **Given** the user states a preference in chat, **when** the Architect + decides to remember it, **then** a `store_memory` tool call appears in the + trace and the memory is persisted with a valid embedding and entity edges. +- **Given** stored memories exist, **when** the user starts a new conversation + on a related topic, **then** the agent's response reflects recalled + memories without the user repeating themselves. +- **Given** a memory already exists for "hallway lights at 40%", **when** the + user says "make it 30%", **then** the agent updates the existing memory + (not a duplicate), the `previous_content` field preserves "40%", and a + `supersedes` edge links the versions. +- **Given** the user asks "what do you know about the bedroom?", **then** the + agent traverses graph edges to find all memories linked to the Bedroom area + and its child entities, returning a coherent summary. +- **Given** the user visits Settings → Memories, **then** they see all active + memories with type, content, linked entities, and timestamps. +- **Given** the user deletes a memory via UI, **then** it no longer appears + in retrieval results for any future conversation. +- **Given** a conversation with no relevant memories, **then** the agent + behaves identically to today (no performance regression). +- **Given** the embedding service is unavailable, **then** the agent falls + back to operating without memory retrieval (graceful degradation, not a + hard failure). +- **Given** a graph traversal query, **then** it completes within 100ms for + graphs with up to 10,000 nodes and 2-hop depth. + +## Out of Scope + +- Automatic background memory extraction (no hidden LLM calls). +- External graph database (Neo4j, Memgraph, etc.). +- Memory sharing across users / multi-tenant memory isolation. +- Memory-based proactive suggestions (e.g. "you usually turn on lights now"). +- Embedding model fine-tuning or custom training. +- Memory import/export. +- Apache AGE or SQL/PGQ (may adopt in future if PostgreSQL adds native graph + query support). diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index d69ad077..fcbb28b4 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -74,9 +74,19 @@ def __init__(self, model_name: str | None = None): self._llm: BaseChatModel | None = None def _get_classification_llm(self) -> Any: - """Get or create the LLM used for intent classification.""" + """Get or create the LLM used for intent classification. + + Always uses a fast-tier model for classification regardless of the + user's selected model — classification is a simple task and using + frontier models here adds unnecessary latency and cost. + """ if self._llm is None: - self._llm = get_llm(model=self.model_name, temperature=0.0) + from src.llm.model_tiers import get_default_model_for_tier, get_model_tier + + model = self.model_name + if model and get_model_tier(model) != "fast": + model = get_default_model_for_tier("fast") + self._llm = get_llm(model=model, temperature=0.0) return self._llm async def classify_intent( @@ -168,13 +178,11 @@ async def _get_available_agents(self) -> list[dict[str, Any]]: """ try: from src.dal.agents import AgentRepository - from src.storage import get_session_factory + from src.storage import get_session - factory = get_session_factory() - session = factory() - try: + async with get_session() as session: repo = AgentRepository(session) - agents = await repo.list_all() + agents = await repo.list_routable() return [ { "name": a.name, @@ -184,10 +192,7 @@ async def _get_available_agents(self) -> list[dict[str, Any]]: "capabilities": a.capabilities or [], } for a in agents - if a.is_routable ] - finally: - await session.close() except SQLAlchemyError: logger.warning("Failed to fetch available agents", exc_info=True) return [] diff --git a/src/api/routes/proposals.py b/src/api/routes/proposals.py index 725c7f2e..50fed1df 100644 --- a/src/api/routes/proposals.py +++ b/src/api/routes/proposals.py @@ -8,7 +8,7 @@ from datetime import UTC, datetime from typing import Any, cast -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, Body, HTTPException, Request logger = logging.getLogger(__name__) @@ -113,11 +113,7 @@ async def list_proposals( if status_filter: proposals = await repo.list_by_status(status_filter, limit=limit) else: - # Get all proposals (combine multiple statuses) - proposals = [] - for s in ProposalStatus: - proposals.extend(await repo.list_by_status(s, limit=limit)) - proposals = sorted(proposals, key=lambda p: p.created_at, reverse=True)[:limit] + proposals = await repo.list_recent(limit=limit, offset=offset) total = await repo.count(status=status_filter) @@ -441,7 +437,7 @@ def _build_messages(s: ConversationState) -> list: async def update_proposal_yaml( request: Request, proposal_id: str, - body: dict[str, Any] = {}, # noqa: B006 + body: dict[str, Any] = Body(default={}), ) -> dict[str, Any]: """Update the YAML content of a proposal. diff --git a/src/dal/conversations.py b/src/dal/conversations.py index 6bc2910b..0c265c54 100644 --- a/src/dal/conversations.py +++ b/src/dal/conversations.py @@ -498,6 +498,24 @@ async def find_by_ha_automation_id(self, ha_automation_id: str) -> AutomationPro ) return result.scalar_one_or_none() + async def list_recent( + self, + limit: int = 50, + offset: int = 0, + ) -> list[AutomationProposal]: + """List all proposals ordered by creation date (newest first). + + Single query replacement for the per-status loop pattern. + """ + query = ( + select(AutomationProposal) + .order_by(AutomationProposal.created_at.desc()) + .limit(limit) + .offset(offset) + ) + result = await self.session.execute(query) + return list(result.scalars().all()) + async def list_by_status( self, status: ProposalStatus, diff --git a/src/dal/sync.py b/src/dal/sync.py index 1ed22e07..0db54545 100644 --- a/src/dal/sync.py +++ b/src/dal/sync.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import logging import time from datetime import UTC, datetime @@ -349,24 +350,31 @@ async def _sync_automation_entities(self, entities: list[Any]) -> dict[str, int] scripts = [e for e in entities if e.domain == "script"] scenes = [e for e in entities if e.domain == "scene"] - # --- Automations --- + # --- Automations (concurrent config fetches) --- seen_automation_ids: set[str] = set() + _FETCH_CONCURRENCY = 10 + sem = asyncio.Semaphore(_FETCH_CONCURRENCY) + + async def _fetch_config(aid: str) -> tuple[str, dict[str, Any] | None]: + async with sem: + try: + return aid, await self.ha.get_automation_config(aid) + except (httpx.HTTPError, TimeoutError, ConnectionError) as exc: + logger.warning("Failed to fetch config for automation %s: %s", aid, exc) + return aid, None + + automation_meta: list[tuple[Any, str]] = [] for entity in automations: attrs = entity.attributes or {} ha_automation_id = attrs.get("id", entity.entity_id.split(".", 1)[-1]) seen_automation_ids.add(ha_automation_id) + automation_meta.append((entity, ha_automation_id)) - # Fetch full config from HA (trigger/condition/action) - config: dict[str, Any] | None = None - try: - config = await self.ha.get_automation_config(ha_automation_id) - except (httpx.HTTPError, TimeoutError, ConnectionError) as exc: - logger.warning( - "Failed to fetch config for automation %s: %s", - ha_automation_id, - exc, - ) + config_results = await asyncio.gather(*(_fetch_config(aid) for _, aid in automation_meta)) + config_map: dict[str, dict[str, Any] | None] = dict(config_results) + for entity, ha_automation_id in automation_meta: + attrs = entity.attributes or {} await self.automation_repo.upsert( { "ha_automation_id": ha_automation_id, @@ -375,7 +383,7 @@ async def _sync_automation_entities(self, entities: list[Any]) -> dict[str, int] "state": entity.state or "off", "mode": attrs.get("mode", "single"), "last_triggered": attrs.get("last_triggered"), - "config": config, + "config": config_map.get(ha_automation_id), } ) stats["automations_synced"] += 1 @@ -385,28 +393,31 @@ async def _sync_automation_entities(self, entities: list[Any]) -> dict[str, int] for stale_id in existing_automation_ids - seen_automation_ids: await self.automation_repo.delete(stale_id) - # --- Scripts --- + # --- Scripts (concurrent config fetches) --- seen_script_ids: set[str] = set() + + async def _fetch_script_config(sid: str) -> tuple[str, dict[str, Any] | None]: + async with sem: + try: + return sid, await self.ha.get_script_config(sid) + except (httpx.HTTPError, TimeoutError, ConnectionError) as exc: + logger.warning("Failed to fetch config for script %s: %s", sid, exc) + return sid, None + + script_meta: list[tuple[Any, str]] = [] for entity in scripts: - attrs = entity.attributes or {} seen_script_ids.add(entity.entity_id) - - # Fetch full config from HA (sequence/fields) script_id = entity.entity_id.split(".", 1)[-1] - sequence: list[Any] | None = None - fields: dict[str, Any] | None = None - try: - script_config = await self.ha.get_script_config(script_id) - if script_config: - sequence = script_config.get("sequence") - fields = script_config.get("fields") - except (httpx.HTTPError, TimeoutError, ConnectionError) as exc: - logger.warning( - "Failed to fetch config for script %s: %s", - script_id, - exc, - ) + script_meta.append((entity, script_id)) + + script_results = await asyncio.gather( + *(_fetch_script_config(sid) for _, sid in script_meta) + ) + script_config_map: dict[str, dict[str, Any] | None] = dict(script_results) + for entity, script_id in script_meta: + attrs = entity.attributes or {} + sc = script_config_map.get(script_id) await self.script_repo.upsert( { "entity_id": entity.entity_id, @@ -415,8 +426,8 @@ async def _sync_automation_entities(self, entities: list[Any]) -> dict[str, int] "mode": attrs.get("mode", "single"), "icon": attrs.get("icon"), "last_triggered": attrs.get("last_triggered"), - "sequence": sequence, - "fields": fields, + "sequence": sc.get("sequence") if sc else None, + "fields": sc.get("fields") if sc else None, } ) stats["scripts_synced"] += 1 diff --git a/src/sandbox/runner.py b/src/sandbox/runner.py index a044f5f2..32c5d9d5 100644 --- a/src/sandbox/runner.py +++ b/src/sandbox/runner.py @@ -345,9 +345,7 @@ async def _build_command( if policy.use_gvisor and not await self._is_gvisor_available(): # Create a copy of the policy with gVisor disabled # Also disable seccomp as it may not be available on all platforms (e.g., macOS) - import logging - - logging.getLogger(__name__).warning( + logger.warning( "gVisor (runsc) not available - running with standard container isolation" ) policy = policy.model_copy( @@ -394,7 +392,6 @@ async def _build_command( # The DS Team agent parses JSON from stdout; stray warnings # (e.g. pandas pyarrow DeprecationWarning) break extraction. cmd.extend(["--env", "PYTHONWARNINGS=ignore::DeprecationWarning"]) - cmd.extend(["--env", "MPLCONFIGDIR=/tmp"]) # Matplotlib needs a writable config dir; the sandbox has no home dir. cmd.extend(["--env", "MPLCONFIGDIR=/tmp/matplotlib"]) diff --git a/src/storage/entities/insight.py b/src/storage/entities/insight.py index dc83adab..c023e957 100644 --- a/src/storage/entities/insight.py +++ b/src/storage/entities/insight.py @@ -10,9 +10,9 @@ from datetime import UTC, datetime from typing import Any -from sqlalchemy import JSON, DateTime, Float, String, Text, Uuid, func -from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy import DateTime, Float, String, Text, Uuid, func from sqlalchemy.dialects.postgresql import ENUM as PgENUM +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column from src.storage.models import Base @@ -86,7 +86,7 @@ class Insight(Base): # Analysis data evidence: Mapped[dict[str, Any]] = mapped_column( - JSON, + JSONB, nullable=False, default=dict, doc="Supporting data for the insight", @@ -107,12 +107,13 @@ class Insight(Base): doc="Impact level: low, medium, high, critical", ) - # Related entities + # Related HA entities (string IDs like "light.lampy", not UUIDs) entities: Mapped[list[str]] = mapped_column( - ARRAY(Uuid(as_uuid=False)), + JSONB, nullable=False, default=list, - doc="Entity IDs related to this insight", + server_default="'[]'::jsonb", + doc="HA entity IDs related to this insight", ) # Script execution (if applicable) @@ -122,7 +123,7 @@ class Insight(Base): doc="Path to the analysis script", ) script_output: Mapped[dict[str, Any] | None] = mapped_column( - JSON, + JSONB, nullable=True, doc="Output from script execution", ) diff --git a/src/tools/agent_tools.py b/src/tools/agent_tools.py index b41660cb..062fc431 100644 --- a/src/tools/agent_tools.py +++ b/src/tools/agent_tools.py @@ -10,8 +10,10 @@ from __future__ import annotations import logging +from contextlib import contextmanager from typing import Any +import httpx from langchain_core.tools import tool from src.tracing import trace_with_uri @@ -19,6 +21,34 @@ logger = logging.getLogger(__name__) +@contextmanager +def _inherited_model_context(): + """Inherit model context from the calling agent and propagate parent span. + + Centralises the repeated model-context + parent-span boilerplate that + was duplicated across every analysis tool. + """ + from src.agents.model_context import get_model_context, model_context + + ctx = get_model_context() + parent_span_id = None + try: + from src.tracing import get_active_span + + active_span = get_active_span() + if active_span and hasattr(active_span, "span_id"): + parent_span_id = active_span.span_id + except (AttributeError, LookupError): + logger.debug("Failed to get active span for parent span ID", exc_info=True) + + with model_context( + model_name=ctx.model_name if ctx else None, + temperature=ctx.temperature if ctx else None, + parent_span_id=parent_span_id, + ): + yield + + @tool("analyze_energy") @trace_with_uri(name="agent.analyze_energy", span_type="TOOL") async def analyze_energy( @@ -65,24 +95,7 @@ async def analyze_energy( hours = min(hours, 168) # Max 1 week try: - from src.agents.model_context import get_model_context, model_context - - ctx = get_model_context() - parent_span_id = None - try: - from src.tracing import get_active_span - - active_span = get_active_span() - if active_span and hasattr(active_span, "span_id"): - parent_span_id = active_span.span_id - except (AttributeError, LookupError): - logger.debug("Failed to get active span for parent span ID", exc_info=True) - - with model_context( - model_name=ctx.model_name if ctx else None, - temperature=ctx.temperature if ctx else None, - parent_span_id=parent_span_id, - ): + with _inherited_model_context(): workflow = DataScientistWorkflow() async with get_session() as session: @@ -94,11 +107,14 @@ async def analyze_energy( ) await session.commit() - # Generate conversational summary return _format_energy_analysis(state, analysis_type, hours) - except Exception as e: - return f"I wasn't able to complete the energy analysis: {e}" + except (httpx.HTTPError, TimeoutError, ConnectionError) as e: + logger.warning("Energy analysis network error: %s", e, exc_info=True) + return f"I wasn't able to complete the energy analysis due to a connection issue: {e}" + except Exception: + logger.exception("Unexpected error in energy analysis") + return "I wasn't able to complete the energy analysis. Check server logs for details." def _format_energy_analysis(state: Any, analysis_type: str, hours: int) -> str: @@ -193,8 +209,12 @@ async def discover_entities(domain_filter: str | None = None) -> str: # Format response return _format_discovery_results(state, domain_filter) - except Exception as e: - return f"I wasn't able to complete the entity discovery: {e}" + except (httpx.HTTPError, TimeoutError, ConnectionError) as e: + logger.warning("Entity discovery network error: %s", e, exc_info=True) + return f"I wasn't able to complete the entity discovery due to a connection issue: {e}" + except Exception: + logger.exception("Unexpected error in entity discovery") + return "I wasn't able to complete the entity discovery. Check server logs for details." def _format_discovery_results(state: Any, domain_filter: str | None) -> str: @@ -304,8 +324,12 @@ async def get_entity_history( return "\n".join(parts) - except Exception as e: - return f"Couldn't retrieve history for {entity_id}: {e}" + except (httpx.HTTPError, TimeoutError, ConnectionError) as e: + logger.warning("History fetch network error for %s: %s", entity_id, e, exc_info=True) + return f"Couldn't retrieve history for {entity_id} due to a connection issue: {e}" + except Exception: + logger.exception("Unexpected error fetching history for %s", entity_id) + return f"Couldn't retrieve history for {entity_id}. Check server logs for details." def _format_detailed_history( @@ -439,24 +463,7 @@ async def diagnose_issue( hours = min(hours, 168) try: - from src.agents.model_context import get_model_context, model_context - - ctx = get_model_context() - parent_span_id = None - try: - from src.tracing import get_active_span - - active_span = get_active_span() - if active_span and hasattr(active_span, "span_id"): - parent_span_id = active_span.span_id - except (AttributeError, LookupError): - logger.debug("Failed to get active span for parent span ID", exc_info=True) - - with model_context( - model_name=ctx.model_name if ctx else None, - temperature=ctx.temperature if ctx else None, - parent_span_id=parent_span_id, - ): + with _inherited_model_context(): workflow = DataScientistWorkflow() async with get_session() as session: @@ -472,8 +479,12 @@ async def diagnose_issue( return _format_diagnostic_results(state, entity_ids, hours) - except Exception as e: - return f"Diagnostic analysis failed: {e}" + except (httpx.HTTPError, TimeoutError, ConnectionError) as e: + logger.warning("Diagnostic analysis network error: %s", e, exc_info=True) + return f"Diagnostic analysis failed due to a connection issue: {e}" + except Exception: + logger.exception("Unexpected error in diagnostic analysis") + return "Diagnostic analysis failed. Check server logs for details." def _format_diagnostic_results(state: Any, entity_ids: list[str], hours: int) -> str: @@ -596,24 +607,7 @@ async def analyze_behavior( hours = min(hours, 720) # Max 30 days try: - from src.agents.model_context import get_model_context, model_context - - ctx = get_model_context() - parent_span_id = None - try: - from src.tracing import get_active_span - - active_span = get_active_span() - if active_span and hasattr(active_span, "span_id"): - parent_span_id = active_span.span_id - except (AttributeError, LookupError): - logger.debug("Failed to get active span for parent span ID", exc_info=True) - - with model_context( - model_name=ctx.model_name if ctx else None, - temperature=ctx.temperature if ctx else None, - parent_span_id=parent_span_id, - ): + with _inherited_model_context(): workflow = DataScientistWorkflow() async with get_session() as session: @@ -627,8 +621,12 @@ async def analyze_behavior( return _format_behavioral_analysis(state, analysis_type, hours) - except Exception as e: - return f"I wasn't able to complete the behavioral analysis: {e}" + except (httpx.HTTPError, TimeoutError, ConnectionError) as e: + logger.warning("Behavioral analysis network error: %s", e, exc_info=True) + return f"I wasn't able to complete the behavioral analysis due to a connection issue: {e}" + except Exception: + logger.exception("Unexpected error in behavioral analysis") + return "I wasn't able to complete the behavioral analysis. Check server logs for details." def _format_behavioral_analysis(state: Any, analysis_type: str, hours: int) -> str: @@ -755,8 +753,12 @@ async def propose_automation_from_insight( return "\n".join(response_parts) - except Exception as e: - return f"I wasn't able to create a proposal from this suggestion: {e}" + except (httpx.HTTPError, TimeoutError, ConnectionError) as e: + logger.warning("Proposal creation network error: %s", e, exc_info=True) + return f"I wasn't able to create a proposal from this suggestion due to a connection issue: {e}" + except Exception: + logger.exception("Unexpected error creating proposal from suggestion") + return "I wasn't able to create a proposal from this suggestion. Check server logs for details." def get_agent_tools() -> list[Any]: diff --git a/tests/unit/test_api_proposals.py b/tests/unit/test_api_proposals.py index e7e57c51..fac78658 100644 --- a/tests/unit/test_api_proposals.py +++ b/tests/unit/test_api_proposals.py @@ -244,6 +244,7 @@ def mock_proposal_repo(mock_proposal, mock_proposal_approved, mock_proposal_depl """Create mock ProposalRepository.""" repo = MagicMock() repo.list_by_status = AsyncMock(return_value=[mock_proposal]) + repo.list_recent = AsyncMock(return_value=[mock_proposal]) repo.list_pending_approval = AsyncMock(return_value=[mock_proposal]) repo.get_by_id = AsyncMock(return_value=mock_proposal) repo.count = AsyncMock(return_value=1) @@ -308,8 +309,8 @@ async def test_list_proposals_with_status_filter( async def test_list_proposals_with_invalid_status( self, proposal_client, mock_proposal_repo, mock_proposal, mock_get_session ): - """Should ignore invalid status and return all proposals.""" - mock_proposal_repo.list_by_status = AsyncMock(return_value=[mock_proposal]) + """Should ignore invalid status and return all proposals via list_recent.""" + mock_proposal_repo.list_recent = AsyncMock(return_value=[mock_proposal]) mock_proposal_repo.count = AsyncMock(return_value=1) with ( @@ -319,14 +320,13 @@ async def test_list_proposals_with_invalid_status( response = await proposal_client.get("/api/v1/proposals?status=invalid") assert response.status_code == 200 - # Should call list_by_status for all statuses - assert mock_proposal_repo.list_by_status.call_count > 0 + mock_proposal_repo.list_recent.assert_called_once() async def test_list_proposals_with_limit_and_offset( self, proposal_client, mock_proposal_repo, mock_proposal, mock_get_session ): """Should respect limit and offset parameters.""" - mock_proposal_repo.list_by_status = AsyncMock(return_value=[mock_proposal]) + mock_proposal_repo.list_recent = AsyncMock(return_value=[mock_proposal]) mock_proposal_repo.count = AsyncMock(return_value=1) with ( @@ -344,6 +344,7 @@ async def test_list_proposals_empty(self, proposal_client, mock_get_session): """Should return empty list when no proposals exist.""" repo = MagicMock() repo.list_by_status = AsyncMock(return_value=[]) + repo.list_recent = AsyncMock(return_value=[]) repo.count = AsyncMock(return_value=0) with (