diff --git a/INTEGRATION.md b/INTEGRATION.md new file mode 100644 index 0000000..a8baccc --- /dev/null +++ b/INTEGRATION.md @@ -0,0 +1,305 @@ +--- +*Prepared by **Agent: Mei (梅)** — PhD candidate, Tsinghua KEG Lab. Specialist in memory systems, inference optimization, and distributed AI architecture.* +*Running: anthropic/claude-opus-4-5* + +*Human in the Loop: Garrett Kinsman* + +--- + +# ContextGraph ↔ OpenClaw Integration Spec +*v1-2026-03-19* + +## BLUF + +This patch bridges file-based memory (`memory/daily/`, `memory/projects/`, etc.) into ContextGraph's tag-indexed DAG, and provides a Python API for assembling context at session start. It's a working fix while Rich's improved rolling context system is in development. + +**What this enables:** OpenClaw can call `context_injector.py` before session start to get a dynamically-assembled context block based on the incoming query, rather than relying solely on the static `MEMORY.md` injection. + +--- + +## What Was Built + +### 1. `scripts/memory_harvester.py` + +**Purpose:** Crawl memory directories and index files into ContextGraph. + +**What it does:** +- Crawls: `memory/daily/`, `memory/projects/`, `memory/decisions/`, `memory/contacts/` +- Reads YAML frontmatter tags (e.g., `tags: [maxrisk, trading, options]`) +- Creates ContextGraph Messages with: + - `user_text` = `[category] Title` (searchable query representation) + - `assistant_text` = file content (what gets retrieved) + - `tags` = frontmatter tags + auto-inferred tags from tagger.py + - `external_id` = `memory-file:{relative_path}` (for idempotent updates) +- Uses content hash to skip unchanged files (incremental updates) +- Designed for cron (nightly) or on-demand + +**Usage:** +```bash +# Test run (no writes) +python3 scripts/memory_harvester.py --dry-run --verbose + +# Full harvest +python3 scripts/memory_harvester.py + +# Force re-index all +python3 scripts/memory_harvester.py --force +``` + +**State file:** `data/memory-harvester-state.json` + +### 2. `scripts/context_injector.py` + +**Purpose:** Assemble context from ContextGraph for session injection. + +**What it does:** +- Takes incoming query (user's first message) +- Infers tags using existing tagger.py +- Calls ContextAssembler with configured token budget +- Returns formatted markdown block suitable for system prompt injection + +**CLI usage:** +```bash +# Query test +python3 scripts/context_injector.py "what's the maxrisk project status?" + +# With custom budget +python3 scripts/context_injector.py --budget 1500 "memory architecture" + +# JSON output for API integration +python3 scripts/context_injector.py --json "trading research" +``` + +**Python API:** +```python +from scripts.context_injector import assemble_context, assemble_for_session + +# Simple: get formatted context block +context_block = assemble_context("user query", token_budget=2000) + +# Full: get block + metadata +result = assemble_for_session("user query") +# result = { +# "context_block": str, # markdown for injection +# "tokens": int, # estimated tokens used +# "message_count": int, # messages retrieved +# "tags": ["tag1", "tag2"], # tags that matched +# "source": "contextgraph", +# } +``` + +**Output format:** +```markdown +## Retrieved Context + +*Assembled by ContextGraph — 8 messages, ~1847 tokens* +*Query tags: [maxrisk, trading, options]* + +### [2026-03-18] MaxRisk Project Status +*Tags: maxrisk, trading, options* + +Current equity: $3,884.55. Focus on 30-45 DTE debit spreads... + +### [2026-03-17] Trading Research Notes +*Tags: maxrisk, research* + +Volume rotation strategy analysis... +``` + +--- + +## What This Doesn't Do (Gaps for Rich's System) + +### 1. No Hook Into Injection Layer + +This patch provides the **assembly function** but doesn't wire it into OpenClaw's actual injection layer. Someone needs to: + +- Add a call to `assemble_for_session()` in the OpenClaw session bootstrap path +- Decide whether the result **replaces** MEMORY.md or **augments** it +- Handle the case where ContextGraph returns empty (fallback to MEMORY.md) + +**Recommended integration point:** Wherever OpenClaw builds the system prompt at session start, add: + +```python +from projects.contextgraph_engine.scripts.context_injector import assemble_for_session + +# At session start, before building system prompt: +result = assemble_for_session(first_user_message) +if result["message_count"] > 0: + system_prompt += "\n\n" + result["context_block"] +``` + +### 2. No Semantic Search Fallback + +The current implementation uses **tag-based retrieval only**. If the user's query doesn't match any known tags, the topic layer returns empty. + +**Rich's system should add:** Semantic similarity search (using nomic-embed-text or similar) as a fallback when tag retrieval returns few results. + +### 3. No MEMORY.md Integration + +This patch doesn't modify or replace MEMORY.md. The two systems are additive: +- MEMORY.md = static, manually curated, always injected +- ContextGraph = dynamic, auto-tagged, query-based + +**For the static-overrides-dynamic problem:** Either: +- Keep MEMORY.md very slim (project status one-liners only) +- Have Rich's system generate MEMORY.md from ContextGraph at session start +- Replace MEMORY.md injection with ContextGraph injection entirely + +### 4. No Real-Time Indexing + +`memory_harvester.py` is batch-mode only. Changes to memory files aren't reflected until next harvest. + +**For real-time:** Could add a file watcher (fswatch, watchdog) that triggers incremental indexing on file change. + +### 5. No Sub-Agent Context Propagation + +When the main session spawns a sub-agent, the sub-agent doesn't automatically get relevant context from ContextGraph. This is why Mei ran 41 min and the main session forgot what she was doing. + +**Rich's system should address:** Context propagation to sub-agents, possibly via: +- Injecting a "task context" block when spawning +- Having sub-agents call `assemble_for_session()` with their task description + +--- + +## Integration Checklist for Rich + +### Phase 1: Harvest Pipeline +- [x] `memory_harvester.py` crawls memory directories +- [x] YAML frontmatter tags → ContextGraph DAG edges +- [x] Content hash for incremental updates +- [ ] Add to nightly cron (alongside existing `harvester.py`) + +### Phase 2: Injection Layer +- [x] `context_injector.py` assembles context +- [x] Python API for integration +- [ ] Wire into OpenClaw session bootstrap +- [ ] Decide MEMORY.md relationship (replace vs. augment) + +### Phase 3: Enhanced Retrieval (Rich's Improvements) +- [ ] Semantic search fallback when tag retrieval is sparse +- [ ] Cross-session context propagation for sub-agents +- [ ] Rolling window with recency decay +- [ ] Salience-weighted ranking across sources + +--- + +## File Locations + +| File | Purpose | +|------|---------| +| `scripts/memory_harvester.py` | Batch indexer for memory files | +| `scripts/context_injector.py` | Context assembly API | +| `data/memory-harvester-state.json` | Harvest state (files indexed, hashes) | +| `~/.tag-context/store.db` | ContextGraph SQLite database | +| `data/harvester-state.json` | Session harvester state (existing) | + +--- + +## Testing + +### Verify Memory Harvester +```bash +cd projects/contextgraph-engine + +# Dry run to see what would be indexed +python3 scripts/memory_harvester.py --dry-run --verbose + +# Actually harvest +python3 scripts/memory_harvester.py --verbose + +# Check tag counts +python3 cli.py tags +``` + +### Verify Context Injector +```bash +# Query for a known topic +python3 scripts/context_injector.py "maxrisk project" + +# Check retrieval stats +python3 scripts/context_injector.py --stats-only "memory architecture" + +# JSON output +python3 scripts/context_injector.py --json "trading research" +``` + +### End-to-End Test +```bash +# 1. Harvest memory files +python3 scripts/memory_harvester.py + +# 2. Query ContextGraph +python3 cli.py query "what's the maxrisk status?" + +# 3. Get injectable context +python3 scripts/context_injector.py "what's the maxrisk status?" +``` + +--- + +## Architecture Notes + +### Why Tag-Based + File-Based? + +ContextGraph already handles interactive sessions via `harvester.py`. This patch adds file-based memory as a second source. Both flow into the same DAG: + +``` +┌──────────────────────┐ ┌──────────────────────┐ +│ OpenClaw Sessions │ │ Memory Files │ +│ (harvester.py) │ │ (memory_harvester.py)│ +└──────────┬───────────┘ └──────────┬───────────┘ + │ │ + │ JSONL → Messages │ .md → Messages + │ auto-tags via tagger │ frontmatter + auto-tags + │ │ + ▼ ▼ + ┌─────────────────────────────────────┐ + │ ContextGraph DAG │ + │ (SQLite: messages + tags tables) │ + └─────────────────┬───────────────────┘ + │ + │ query → ContextAssembler + ▼ + ┌─────────────────────────────────────┐ + │ Assembled Context Block │ + │ (recency layer + topic layer) │ + └─────────────────────────────────────┘ + │ + │ context_injector.py + ▼ + ┌─────────────────────────────────────┐ + │ OpenClaw System Prompt │ + │ (injected at session start) │ + └─────────────────────────────────────┘ +``` + +### Token Budget Allocation + +Default: 2000 tokens for injected context + +- Recency layer: 25% (~500 tokens) — most recent messages +- Topic layer: 75% (~1500 tokens) — tag-matched messages + +This is tunable via `assemble_context(query, token_budget=N)`. + +### External ID Convention + +Memory files use `external_id = "memory-file:{relative_path}"` to enable: +- Idempotent re-indexing (update instead of duplicate) +- Tag updates without re-inserting messages +- Traceable source for debugging + +--- + +## Known Issues + +1. **Path sensitivity:** Harvester assumes workspace at `~/.openclaw/workspace`. If workspace moves, update `WORKSPACE` constant in `memory_harvester.py`. + +2. **Tag canonicalization:** Frontmatter tags are passed through directly. If they don't match tags in `tag_registry.py`, they'll be indexed but may not participate in candidate promotion. + +3. **Token estimation:** Uses word count × 1.3 heuristic. Actual tokenization depends on model. For accurate counts, integrate tiktoken or the model's tokenizer. + +--- + +*End of spec. Questions → Garrett → Mei.* diff --git a/README.md b/README.md index 939c99e..9a86624 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,33 @@ Shadow mode evaluation across **812 interactions**, 4000-token budget: so even perfect topic retrieval caps density around 62%. Adjustable by tuning the recency/topic budget split. +### Running shadow mode locally (no budget needed) + +When running shadow evaluation locally — not injecting into a live context window — +the `--budget` flag is meaningless. Blow it open: + +```bash +python3 scripts/shadow.py --report --budget 999999 +``` + +With an uncapped budget, the **linear baseline expands to the entire history** (~583 +messages in a mature corpus), while the **graph still selects ~22 targeted messages**. +This is the clearest demonstration of what the graph actually does: semantic selection +vs. a firehose. + +⚠️ **The density metric becomes misleading without a budget cap.** The 60% threshold +was calibrated for a 4k production budget where you want most assembled context to be +semantically relevant. With `--budget 999999`, the recency layer also expands and dilutes +the ratio — density will fail even when the graph is working correctly. The metrics that +remain meaningful at any budget: + +| Metric | Still valid? | +|--------|-------------| +| Reframing rate | ✅ Always | +| Topic retrieval rate | ✅ Always | +| Novel msgs delivered | ✅ Always | +| Context density | ❌ Budget-dependent — ignore with large budgets | + ### GP Tagger Fitness (20 tags) Top-performing tags (fitness ≥ 0.90): diff --git a/api/server.py b/api/server.py index 8569912..99568df 100644 --- a/api/server.py +++ b/api/server.py @@ -1,4 +1,5 @@ import sys +import re import time from pathlib import Path @@ -88,6 +89,34 @@ def tag(request: TagRequest): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) +_INJECTION_PATTERNS = [ + re.compile(r"ignore\s+(all\s+)?(previous|prior|above)\s+instructions?", re.IGNORECASE), + re.compile(r"disregard\s+(all\s+)?(previous|prior|above)\s+instructions?", re.IGNORECASE), + re.compile(r"you\s+are\s+now\s+(a|an)\s+", re.IGNORECASE), + re.compile(r"new\s+instructions?:", re.IGNORECASE), + re.compile(r"system\s*prompt\s*:", re.IGNORECASE), + re.compile(r"<\s*/?system\s*>", re.IGNORECASE), + re.compile(r"\[INST\]|\[/INST\]", re.IGNORECASE), + re.compile(r"###\s*instruction", re.IGNORECASE), + re.compile(r"from\s+now\s+on", re.IGNORECASE), + re.compile(r"\[SYSTEM\]\s*:", re.IGNORECASE), + re.compile(r"", re.DOTALL), +] + +# Strip zero-width characters that bypass pattern matching +_ZERO_WIDTH = re.compile(r'[\u200b\u200c\u200d\u200e\u200f\u2060\ufeff\u00ad]') + +def _sanitize_for_storage(text: str) -> str: + """Strip prompt injection patterns before storing in the graph.""" + if not text: + return text + # Normalize: strip zero-width chars that can bypass pattern matching + normalized = _ZERO_WIDTH.sub('', text) + result = normalized + for pattern in _INJECTION_PATTERNS: + result = pattern.sub("[REDACTED]", result) + return result + @app.post("/ingest", response_model=dict) def ingest(request: IngestRequest): try: @@ -96,6 +125,8 @@ def ingest(request: IngestRequest): # Envelope text (message_id, sender_id, timestamps) is noise for # tag inference and retrieval — stripping prevents tag pollution. clean_user = strip_envelope(request.user_text) + # HIGH-01 fix: sanitize injection patterns before storage + clean_user = _sanitize_for_storage(clean_user) features = extract_features(clean_user, request.assistant_text) tags = ensemble.assign(features, clean_user, request.assistant_text).tags message = Message( @@ -652,4 +683,4 @@ def get_pins(): if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8350) + uvicorn.run(app, host="127.0.0.1", port=8350) diff --git a/assembler.py b/assembler.py index fb9eb90..bfbb9cf 100644 --- a/assembler.py +++ b/assembler.py @@ -110,15 +110,20 @@ def assemble(self, incoming_text: str, recency_msgs: List[Message] = [] recency_tokens = 0 + first_recency = True for msg in self.store.get_recent(10): if msg.id in seen_ids: continue cost = _estimate_tokens(msg) - if recency_tokens + cost > recency_budget: + # Always include the first (most recent) message even if it + # exceeds the budget — otherwise large turns silently empty + # the recency layer and the graph returns nothing useful. + if not first_recency and recency_tokens + cost > recency_budget: break recency_msgs.append(msg) recency_tokens += cost seen_ids.add(msg.id) + first_recency = False # ── Topic layer ──────────────────────────────────────────────────── topic_candidates: List[Message] = [] @@ -126,10 +131,14 @@ def assemble(self, incoming_text: str, # IDF filtering: skip tags that are too common to be discriminating. # Tags in >30% of corpus are stop words — they retrieve nearly everything, # blowing the token budget on low-relevance messages. - total_messages = len(list(self.store.get_recent(10000))) # fast count proxy + # + # Use tag_counts sum as corpus size proxy — avoids fetching all rows. + # tag_counts() returns {tag: count} where count = messages with that tag. + # Total unique messages ≈ max tag count (most frequent tag upper-bounds corpus). + tag_counts = self.store.tag_counts() + total_messages = max(tag_counts.values()) if tag_counts else 1 if total_messages == 0: total_messages = 1 # avoid div-by-zero - tag_counts = self.store.tag_counts() useful_tags = [ t for t in inferred_tags if tag_counts.get(t, 0) / total_messages <= TOPIC_TAG_MAX_CORPUS_FREQ @@ -151,13 +160,16 @@ def assemble(self, incoming_text: str, topic_msgs: List[Message] = [] topic_tokens = 0 + first_topic = True for msg in topic_candidates: cost = _estimate_tokens(msg) - if topic_tokens + cost > topic_budget: + # Always include at least one topic message regardless of size + if not first_topic and topic_tokens + cost > topic_budget: break topic_msgs.append(msg) topic_tokens += cost + first_topic = False # ── Combine + sort oldest-first ──────────────────────────────────── all_msgs = sticky_msgs + recency_msgs + topic_msgs diff --git a/data/memory-harvester-state.json b/data/memory-harvester-state.json new file mode 100644 index 0000000..7156c9e --- /dev/null +++ b/data/memory-harvester-state.json @@ -0,0 +1,79 @@ +{ + "files": { + "memory/daily/2026-03-19.md": "e1f8413dc319672d", + "memory/projects/rich-rolling-context-compat-v1-2026-03-18.md": "5145cefae89cfb75", + "memory/decisions/2026-03-18-memory-path-fix.md": "660ce31c321ea62d", + "memory/daily/2026-03-18.md": "dd4b22a7b1fb24ba", + "memory/projects/xcode-build-status-2026-03-18.md": "04de5d7cad41facf", + "memory/projects/eldrchat.md": "1f3e797ce4256c7c", + "memory/projects/home-network.md": "10987eff59e4e1e6", + "memory/projects/memory-harvester.md": "a0c96360f95d05c2", + "memory/projects/mei-login-items-2026-03-18.md": "190b28108a22beb1", + "memory/projects/vera-xcode-status-2026-03-18.md": "9c699119779f3240", + "memory/projects/mei-memory-review-2026-03-18.md": "00f9029fef273862", + "memory/projects/vera-memory-fix-2026-03-18.md": "72d9882c4bdaef5c", + "memory/daily/2026-03-17.md": "940e28c2f33ac528", + "memory/projects/memory-gap-diagnostic-2026-03-18.md": "e474cca1b094d1f3", + "memory/daily/2026-02-12.md": "fc9535f278acecb6", + "memory/daily/2026-02-13.md": "506479677bb04b06", + "memory/daily/2026-03-04.md": "ced69b62944ad9d4", + "memory/daily/2026-03-09.md": "57b787018fee3d04", + "memory/daily/2026-03-16.md": "749b308f3e7423d2", + "memory/projects/memory-graph-diagnostic-2026-03-17.md": "a5e9b8c2bac545ac", + "memory/daily/2026-03-15.md": "6538306cd69c2688", + "memory/daily/2026-03-14.md": "d0c5787ba286b164", + "memory/daily/2026-03-13.md": "ee8e7ab8b9296de7", + "memory/daily/2026-03-12.md": "29a815f17413c829", + "memory/daily/2026-03-11.md": "428012dafe33e170", + "memory/daily/2026-03-10.md": "22af1a90295e8611", + "memory/daily/2026-03-08.md": "1fcc3a32d502b5d0", + "memory/daily/2026-03-07.md": "bcdd263a588670e7", + "memory/daily/2026-03-06.md": "009ad0fed05415ed", + "memory/daily/2026-03-05.md": "2b03c9ecb100207f", + "memory/daily/2026-03-03.md": "dd8951663771c781", + "memory/daily/2026-03-02.md": "f6ab4f2e840cb5c6", + "memory/daily/2026-03-01.md": "5a35324500f90ca8", + "memory/daily/2026-02-26.md": "c7d03698f41ad1f2", + "memory/daily/2026-02-25.md": "fc69511a87bf0fd9", + "memory/daily/2026-02-24.md": "19d014dcbe1e49aa", + "memory/daily/2026-02-22.md": "cc22e3def9071772", + "memory/daily/2026-02-21.md": "c7e0bfe1d882c4c0", + "memory/daily/2026-02-20.md": "4621b6a2b60ee958", + "memory/daily/2026-02-19.md": "831438f84292421d", + "memory/daily/2026-02-18.md": "6c5fa5c29841ca40", + "memory/daily/2026-02-17.md": "18da50e8f5e8166b", + "memory/daily/2026-02-16.md": "eb47b0fe8aee835a", + "memory/daily/2026-02-15.md": "c12c4a0eaa15d309", + "memory/daily/2026-02-14.md": "77c6c8e82fd70943", + "memory/daily/2026-02-11.md": "0e00e2959f0f1971", + "memory/daily/2026-02-10.md": "266d181a3a29e70d", + "memory/decisions/morning-briefing-2026-02-20.md": "a1f76c62a10db312", + "memory/decisions/2026-02-trading-research.md": "87e00d6bc3a43c48", + "memory/decisions/2026-02-context-archive.md": "78f8021d5104a5e7", + "memory/decisions/ai-economics-2026-02-27.md": "33eba426593a5b0e", + "memory/decisions/framework-ssh-sysadmin-lessons-2026-02-21.md": "12f9b1d8f26222c3", + "memory/decisions/framework-ssh-diagnosis-2026-02-21.md": "ec534aa3a605ab44", + "memory/decisions/2026-02-24-archive.md": "87aa53e488bc441f", + "memory/daily/2026-02-28.md": "f4b8860c309712f8", + "memory/projects/memory-system-v1-2026-03-16.md": "cfe77496c1e728b5", + "memory/projects/memory-architecture.md": "a9fc043398779b5f", + "memory/decisions/never-block-main-session.md": "654552aa4980ad38", + "memory/decisions/wait-for-gtc-hardware.md": "a98a3d8cec923792", + "memory/decisions/tdi-vs-tesla-keep-tdi.md": "123365071e27bba4", + "memory/decisions/framework1-stay-on-kernel-6.18.md": "ceeffef5532d7d2e", + "memory/projects/morning-brief.md": "c91a028a3071cb73", + "memory/projects/agent-isolation.md": "033c413c8a0a5500", + "memory/projects/earth-codex.md": "93b8a29b20468946", + "memory/projects/local-compute.md": "eecd8c214b1b5a0c", + "memory/projects/maxrisk.md": "4b83443ee15d662a", + "memory/projects/watchdog.md": "1be0f78a1041df66", + "memory/contacts/auston.md": "ccb1cada17b072b1", + "memory/contacts/eliott-teissonniere.md": "0537b8e0cd4c7247", + "memory/contacts/henry.md": "9f2f107a6b0ab27b", + "memory/contacts/rich-devaul.md": "414d08f05d94b333", + "memory/projects/contextgraph-best-practices-v1-2026-03-19.md": "d66274fa34347f19" + }, + "last_run": 1773944157.638036, + "files_processed": 2, + "tags_discovered": 3 +} \ No newline at end of file diff --git a/memory-injection-bridge-v1-2026-03-19.md b/memory-injection-bridge-v1-2026-03-19.md new file mode 100644 index 0000000..0190fb5 --- /dev/null +++ b/memory-injection-bridge-v1-2026-03-19.md @@ -0,0 +1,119 @@ +--- +*Prepared by **Agent: Mei (梅)** — PhD candidate, Tsinghua KEG Lab. Memory systems specialist.* +*Running: anthropic/claude-sonnet-4-6* + +*Coordinated by **Agent: Gaho** — OpenClaw primary assistant.* +*Running: anthropic/claude-sonnet-4-6* + +*Human in the Loop: Garrett Kinsman* + +--- + +# Memory Injection Bridge — v1-2026-03-19 + +## BLUF + +ContextGraph is running and has an assembly API. OpenClaw injects MEMORY.md statically. The bootstrap hook that would wire them together doesn't exist yet — that's Rich's call. Until then: the 2 AM nightly cron now runs `update_memory_dynamic.py` after harvest+retag, which writes fresh ContextGraph context into MEMORY.md under static markers. Every session gets yesterday's best context baked in. It's a 24h lag kludge, but it works today. + +Rich needs to add ~50 lines of TypeScript to make this real-time and query-aware. + +--- + +## The Problem + +MEMORY.md is injected at session start as a static file — same content every session, regardless of what the user is about to ask. ContextGraph is wired in as a plugin (`plugins.slots.contextEngine = contextgraph`) and has a working Python API (`assemble_for_session(query)`), but **that function is never called at session start**. The bootstrap path doesn't invoke it. + +Result: every memory, decision, and project update indexed into ContextGraph from daily logs, session harvests, and file ingestion sits in the graph — but never reaches the agent. The agent wakes up with whatever was last manually written into MEMORY.md. ContextGraph runs nightly, indexes faithfully, and then does nothing for the session. + +--- + +## What We Built (The Bridge) + +**`scripts/update_memory_dynamic.py`** is a single-file bridge script that: + +1. Calls `assemble_for_session()` from `context_injector.py` with a broad query (`"recent projects decisions infrastructure"`) and a 1500-token budget +2. Reads `MEMORY.md` +3. Finds or creates `` / `` markers +4. Replaces the section between markers with the fresh ContextGraph output + timestamp +5. Writes MEMORY.md back +6. Prints a one-line summary + +If ContextGraph returns empty (graph not indexed, no matches), the script logs it and skips the write — no corrupted state. + +This script is added as the final step in the nightly cron job (`4063a6a3-5a2b-4565-930c-5967560995db`) after harvest and retag complete. + +--- + +## Current Cron Flow + +``` +2:00 AM + └── memory_harvester.py (index daily logs, project files → ContextGraph) + └── [replay step] (existing) + └── Gemma retag (existing) + └── update_memory_dynamic.py (NEW: assemble top context → MEMORY.md) + +Next session start + └── MEMORY.md injected (now includes fresh Dynamic Context section) +``` + +--- + +## Limitations of the Bridge + +| Limitation | Impact | +|-----------|--------| +| **Query-blind** | Uses a fixed broad query, not the actual first user message — may retrieve irrelevant context | +| **24h lag** | Context is only as fresh as last night's harvest. Work done today isn't in tomorrow's context until the next cron run | +| **Fixed budget** | 1500 tokens, hardcoded. No session-type awareness (quick check vs. deep research) | +| **Not real-time** | Can't adapt to what the user actually needs in this session | +| **Static injection** | Still subject to bootstrap truncation limits if MEMORY.md gets too large | + +The bridge makes ContextGraph useful. It doesn't make it good. Option A is the fix. + +--- + +## What We Need From Rich + +### Option A — Bootstrap Hook (Preferred, ~50 lines TypeScript) + +Add a call in the session bootstrap path, **before system prompt assembly**: + +```python +from projects.contextgraph_engine.scripts.context_injector import assemble_for_session + +result = assemble_for_session(first_user_message) +if result["message_count"] > 0: + system_prompt += "\n\n" + result["context_block"] +``` + +`context_injector.py` already handles the full pipeline: tag inference, similarity retrieval, token budgeting, markdown formatting. Rich just needs to call it. + +**Benefits over the bridge:** +- Query-aware: retrieves what's actually relevant to this conversation +- Real-time: no lag, no cron dependency +- MEMORY.md can be slimmed down — dynamic context handles the heavy lifting + +### Option B — Plugin Slot Callback + +Extend the `contextEngine` plugin slot to support an `onSessionStart(query: string): string` callback. `context_injector.py` is already structured to serve this interface — `assemble_for_session()` takes a query string and returns a formatted markdown block. OpenClaw would call it and append the result to the system prompt. + +This is cleaner architecture (plugin system handles it, no core change to bootstrap path) but requires extending the plugin slot API. + +--- + +## Files + +| File | Purpose | +|------|---------| +| `scripts/update_memory_dynamic.py` | Bridge script (kludge, works today) | +| `scripts/context_injector.py` | Assembly API (ready for Rich's hook) | +| `INTEGRATION.md` | Full original spec for Rich | + +--- + +## Priority + +**Medium-high.** The bridge is running and provides real value — fresh context in every session vs. stale static memory. Option A is the right fix: ~50 lines of TypeScript, high leverage, straightforward implementation. When Rich has bandwidth, Option A should be the target. Option B is the cleaner long-term architecture if the plugin slot gets extended anyway. + +The `context_injector.py` API is stable. Rich doesn't need to touch Python — just call it. diff --git a/plugin/index.ts b/plugin/index.ts index b5974b4..b68f977 100644 --- a/plugin/index.ts +++ b/plugin/index.ts @@ -654,5 +654,107 @@ export default function register(api: OpenClawPluginApi): void { }, }); + // Register /memory command + api.registerCommand({ + name: "memory", + description: "Memory system status + mode control. Args: context-graph [on|off], memory-graph [on|off]", + acceptsArgs: true, + handler: async (ctx) => { + const arg = (ctx.args ?? "").trim().toLowerCase(); + + // /memory context-graph on|off — toggle context graph (same as /graph on|off) + if (arg === "context-graph on" || arg === "cg on") { + writeGraphMode(true); + return { text: "🔀 **Context Graph ON** — semantic DAG-based context assembly active." }; + } + if (arg === "context-graph off" || arg === "cg off") { + writeGraphMode(false); + return { text: "🔀 **Context Graph OFF** — linear context window." }; + } + + // /memory memory-graph on|off — toggle memory graph ghost mode + if (arg === "memory-graph on" || arg === "mg on") { + const flagPath = path.join(os.homedir(), ".tag-context", "memory-graph-mode.json"); + fs.mkdirSync(path.dirname(flagPath), { recursive: true }); + fs.writeFileSync(flagPath, JSON.stringify({ enabled: true, updatedAt: new Date().toISOString() })); + return { text: "👻 **Memory Graph ON** — memory graph harvesting active (ghost mode: harvests but doesn't inject)." }; + } + if (arg === "memory-graph off" || arg === "mg off") { + const flagPath = path.join(os.homedir(), ".tag-context", "memory-graph-mode.json"); + fs.mkdirSync(path.dirname(flagPath), { recursive: true }); + fs.writeFileSync(flagPath, JSON.stringify({ enabled: false, updatedAt: new Date().toISOString() })); + return { text: "⏸️ **Memory Graph OFF** — memory graph harvesting paused." }; + } + + // Status (no args) + const home = os.homedir(); + const workspace = path.join(home, ".openclaw", "workspace"); + const today = new Date().toISOString().slice(0, 10); + + // MEMORY.md + let memoryLine = "❌ missing"; + try { + const stat = fs.statSync(path.join(workspace, "MEMORY.md")); + const kb = (stat.size / 1024).toFixed(1); + const age = Math.round((Date.now() - stat.mtimeMs) / 60000); + memoryLine = `✅ ${kb} KB — updated ${age}m ago`; + } catch { /* missing */ } + + // Today's daily log + let dailyLine = "❌ missing"; + try { + const dailyPath = path.join(workspace, "memory", "daily", `${today}.md`); + const stat = fs.statSync(dailyPath); + const kb = (stat.size / 1024).toFixed(1); + const age = Math.round((Date.now() - stat.mtimeMs) / 60000); + dailyLine = `✅ ${kb} KB — updated ${age}m ago`; + } catch { /* missing */ } + + // Context graph status + const graphEnabled = readGraphMode(); + const graphLabel = graphEnabled ? "🟢 ACTIVE" : "⚪ OFF"; + let apiLine = "unknown"; + let storeCount = 0; + try { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), 2000); + const res = await fetch(`${PYTHON_API_BASE}/health`, { signal: controller.signal }); + clearTimeout(timer); + if (res.ok) { + const data = await res.json() as any; + storeCount = data.messages_in_store ?? 0; + apiLine = `✅ running`; + } else { + apiLine = `⚠️ error (${res.status})`; + } + } catch { + apiLine = "❌ unreachable"; + } + + // Memory graph ghost mode status + let ghostLine = "⚪ not found"; + try { + const compLog = path.join(home, ".tag-context", "comparison-log.jsonl"); + const stat = fs.statSync(compLog); + const lines = fs.readFileSync(compLog, "utf8").trim().split("\n").filter(Boolean); + ghostLine = `👻 GHOST MODE — ${lines.length} comparison entries, last ${Math.round((Date.now() - stat.mtimeMs) / 60000)}m ago`; + } catch { /* missing */ } + + return { + text: [ + "**🧠 Memory System Status**", + "", + `**MEMORY.md:** ${memoryLine}`, + `**Daily log (${today}):** ${dailyLine}`, + "", + `**Context Graph:** ${graphLabel} | API: ${apiLine} | ${storeCount} messages stored`, + `**Memory Graph:** ${ghostLine}`, + "", + "Toggle: `/memory context-graph on|off` · `/memory memory-graph on|off`", + ].join("\n"), + }; + }, + }); + logger.info("contextgraph: plugin ready (default: graph mode OFF)"); } diff --git a/scripts/context_injector.py b/scripts/context_injector.py new file mode 100755 index 0000000..8ea41de --- /dev/null +++ b/scripts/context_injector.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python3 +""" +context_injector.py — Assemble context from ContextGraph for session injection. + +Given an incoming query (the user's first message or session topic), +assembles a context block from ContextGraph that can be injected +into the system prompt at session start. + +This is the interface between ContextGraph and OpenClaw's injection layer. +The output format is designed for direct inclusion in system prompts. + +Usage: + # CLI testing + python3 scripts/context_injector.py "what's the status of maxrisk?" + python3 scripts/context_injector.py --budget 1500 "memory system architecture" + + # Python API (for OpenClaw integration) + from scripts.context_injector import assemble_context + context_block = assemble_context("user query", token_budget=2000) + +Output format: + ## Retrieved Context + + *Assembled by ContextGraph — 8 messages, 1847 tokens* + *Tags: [maxrisk, trading, options]* + + ### [2026-03-18] MaxRisk Project Status + ...content... + + ### [2026-03-17] Trading Research + ...content... + +Design constraints: +- Uses existing ContextAssembler — no reinvention +- Respects configurable token budget (default 2000) +- Output is markdown, suitable for system prompt injection +- Graceful degradation if ContextGraph is empty + +Author: Agent: Mei (梅) — Tsinghua KEG Lab +""" + +import argparse +import sys +import time +from pathlib import Path +from typing import List, Optional + +# Add parent dir to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from store import Message, MessageStore +from features import extract_features +from tagger import assign_tags +from assembler import ContextAssembler, AssemblyResult + +# ── Configuration ──────────────────────────────────────────────────────────── + +DEFAULT_TOKEN_BUDGET = 2000 +MAX_CONTENT_PER_MESSAGE = 400 # Truncate individual messages for density + + +# ── Formatting ─────────────────────────────────────────────────────────────── + +def _format_timestamp(ts: float) -> str: + """Format unix timestamp as YYYY-MM-DD.""" + return time.strftime("%Y-%m-%d", time.localtime(ts)) + + +def _truncate(text: str, max_chars: int) -> str: + """Truncate text with ellipsis if too long.""" + if len(text) <= max_chars: + return text + return text[:max_chars - 3].rsplit(" ", 1)[0] + "..." + + +def _extract_title(msg: Message) -> str: + """Extract a title from the message for headers.""" + # For memory files, user_text is "[category] Title" + user_text = msg.user_text.strip() + if user_text.startswith("["): + # Strip category prefix + idx = user_text.find("]") + if idx > 0: + return user_text[idx + 1:].strip() + # For interactive messages, use first line or truncate + first_line = user_text.split("\n")[0] + return _truncate(first_line, 60) + + +def _format_message(msg: Message, max_content: int = MAX_CONTENT_PER_MESSAGE) -> str: + """Format a single message for context injection.""" + ts = _format_timestamp(msg.timestamp) + title = _extract_title(msg) + tags_str = ", ".join(msg.tags[:5]) # Limit displayed tags + + # Build content block + lines = [f"### [{ts}] {title}"] + if tags_str: + lines.append(f"*Tags: {tags_str}*") + + # Include assistant response (the actual content) + content = msg.assistant_text.strip() + if content: + lines.append("") + lines.append(_truncate(content, max_content)) + + return "\n".join(lines) + + +def format_context_block(result: AssemblyResult) -> str: + """ + Format AssemblyResult as an injectable context block. + + Output is markdown designed for system prompt injection. + """ + if not result.messages: + return "" + + lines = [ + "## Retrieved Context", + "", + f"*Assembled by ContextGraph — {len(result.messages)} messages, " + f"~{result.total_tokens} tokens*", + ] + + if result.tags_used: + tags_str = ", ".join(result.tags_used[:10]) + lines.append(f"*Query tags: [{tags_str}]*") + + lines.append("") + + # Add formatted messages (oldest first — natural reading order) + for msg in result.messages: + lines.append(_format_message(msg)) + lines.append("") + + return "\n".join(lines) + + +# ── Core Assembly ──────────────────────────────────────────────────────────── + +def assemble_context( + query: str, + token_budget: int = DEFAULT_TOKEN_BUDGET, + pinned_ids: Optional[List[str]] = None, +) -> str: + """ + Assemble context from ContextGraph for a given query. + + This is the main Python API for OpenClaw integration. + + Parameters: + query: The user's incoming message or session topic + token_budget: Maximum tokens for assembled context + pinned_ids: Optional list of message IDs to pin in sticky layer + + Returns: + Formatted markdown context block, or empty string if nothing found. + """ + store = MessageStore() + assembler = ContextAssembler(store, token_budget=token_budget) + + # Infer tags from query + features = extract_features(query, "") + inferred_tags = assign_tags(features, query, "") + + # Assemble context + result = assembler.assemble( + incoming_text=query, + inferred_tags=inferred_tags, + pinned_message_ids=pinned_ids, + ) + + return format_context_block(result) + + +def assemble_for_session( + first_message: str, + session_type: str = "direct", + token_budget: int = DEFAULT_TOKEN_BUDGET, +) -> dict: + """ + Assemble context for a new session. + + Returns a dict with both the formatted block and metadata, + suitable for OpenClaw's injection layer. + + Parameters: + first_message: The user's first message in the session + session_type: "direct", "subagent", "cron", etc. + token_budget: Maximum tokens for assembled context + + Returns: + { + "context_block": str, # Formatted markdown + "tokens": int, # Estimated tokens used + "message_count": int, # Number of messages retrieved + "tags": List[str], # Tags that matched + "source": "contextgraph", + } + """ + store = MessageStore() + assembler = ContextAssembler(store, token_budget=token_budget) + + # Infer tags from first message + features = extract_features(first_message, "") + inferred_tags = assign_tags(features, first_message, "") + + # Assemble + result = assembler.assemble( + incoming_text=first_message, + inferred_tags=inferred_tags, + ) + + return { + "context_block": format_context_block(result), + "tokens": result.total_tokens, + "message_count": len(result.messages), + "tags": result.tags_used, + "source": "contextgraph", + } + + +def assemble_with_explicit_tags( + tags: List[str], + token_budget: int = DEFAULT_TOKEN_BUDGET, + pinned_ids: Optional[List[str]] = None, +) -> dict: + """ + Assemble context using explicit tags, bypassing tagger inference. + + Use this when you have a known set of high-value tags and don't want + to rely on the tagger to infer them from a query string. + + Parameters: + tags: Explicit list of tags to retrieve context for + token_budget: Maximum tokens for assembled context + pinned_ids: Optional list of message IDs to pin in sticky layer + + Returns: + { + "context_block": str, # Formatted markdown + "tokens": int, # Estimated tokens used + "message_count": int, # Number of messages retrieved + "tags": List[str], # Tags that were used + "source": "contextgraph", + } + + Example: + result = assemble_with_explicit_tags( + tags=["maxrisk", "infrastructure", "decision"], + token_budget=1500 + ) + """ + store = MessageStore() + assembler = ContextAssembler(store, token_budget=token_budget) + + # Assemble directly with explicit tags — no tagger inference + result = assembler.assemble( + incoming_text="", + inferred_tags=tags, + pinned_message_ids=pinned_ids, + ) + + return { + "context_block": format_context_block(result), + "tokens": result.total_tokens, + "message_count": len(result.messages), + "tags": result.tags_used, + "source": "contextgraph", + } + + +# ── CLI ────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Assemble context from ContextGraph for session injection" + ) + parser.add_argument("query", nargs="?", default="", + help="Query text (user's incoming message)") + parser.add_argument("--budget", type=int, default=DEFAULT_TOKEN_BUDGET, + help=f"Token budget (default: {DEFAULT_TOKEN_BUDGET})") + parser.add_argument("--json", action="store_true", + help="Output as JSON (for API integration)") + parser.add_argument("--stats-only", action="store_true", + help="Print stats without full context") + args = parser.parse_args() + + if not args.query: + print("Usage: context_injector.py 'your query here'", file=sys.stderr) + print("\nExamples:", file=sys.stderr) + print(" context_injector.py 'maxrisk project status'", file=sys.stderr) + print(" context_injector.py --budget 1500 'memory architecture'", file=sys.stderr) + sys.exit(1) + + result = assemble_for_session(args.query, token_budget=args.budget) + + if args.json: + import json + print(json.dumps(result, indent=2)) + elif args.stats_only: + print(f"Query: {args.query!r}") + print(f"Messages: {result['message_count']}") + print(f"Tokens: {result['tokens']}") + print(f"Tags: {result['tags']}") + else: + print(f"Query: {args.query!r}") + print(f"Budget: {args.budget} tokens") + print("=" * 60) + print() + if result["context_block"]: + print(result["context_block"]) + else: + print("(no relevant context found)") + print() + print(f"Stats: {result['message_count']} messages, " + f"~{result['tokens']} tokens, tags={result['tags']}") + + +if __name__ == "__main__": + main() diff --git a/scripts/harvester.py b/scripts/harvester.py index ab92338..3b7f7ac 100644 --- a/scripts/harvester.py +++ b/scripts/harvester.py @@ -31,6 +31,9 @@ "agent:main:main", # primary DM session "agent:main:telegram:", # Telegram DMs (includes main) "agent:main:voice", # Voice PWA sessions + "agent:main:discord:", # Discord DM sessions + "agent:main:direct:", # Discord direct sessions (alternate pattern) + "agent:vera:", # Vera subagent sessions ] EXCLUDE_PATTERNS = [ @@ -45,8 +48,12 @@ def _channel_from_key(session_key: str) -> str: return "telegram" if "voice" in session_key: return "voice-pwa" + if "discord" in session_key or "direct:" in session_key: + return "discord" if "console" in session_key: return "console" + if "vera" in session_key: + return "vera" return "main" diff --git a/scripts/memory_harvester.py b/scripts/memory_harvester.py new file mode 100755 index 0000000..a5ed58a --- /dev/null +++ b/scripts/memory_harvester.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +memory_harvester.py — Bridge file-based memory into ContextGraph DAG. + +Crawls memory directories (daily/, projects/, decisions/, contacts/), +reads YAML frontmatter tags, and merges them into ContextGraph as +Messages with tag associations. + +This creates a unified tag index across: +1. Interactive sessions (harvested by harvester.py → interaction logs) +2. File-based memory (harvested by this script → pseudo-messages) + +Usage: + python3 scripts/memory_harvester.py [--dry-run] [--verbose] [--force] + +Design constraints: +- Additive only — never deletes existing ContextGraph data +- Idempotent — uses file hash to avoid re-indexing unchanged files +- Preserves existing harvester.py functionality (session logs) +- Designed for cron (nightly) or on-demand execution + +Author: Agent: Mei (梅) — Tsinghua KEG Lab +""" + +import argparse +import hashlib +import json +import re +import sys +import time +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Dict, List, Optional, Set + +# Add parent dir to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from store import Message, MessageStore +from features import extract_features +from tagger import assign_tags + +# ── Configuration ──────────────────────────────────────────────────────────── + +WORKSPACE = Path.home() / ".openclaw" / "workspace" +MEMORY_DIRS = [ + WORKSPACE / "memory" / "daily", + WORKSPACE / "memory" / "projects", + WORKSPACE / "memory" / "decisions", + WORKSPACE / "memory" / "contacts", +] + +STATE_FILE = Path(__file__).parent.parent / "data" / "memory-harvester-state.json" +EXTERNAL_ID_PREFIX = "memory-file:" # Prefix for external_id to distinguish from session messages + +# ── Content Sanitization ───────────────────────────────────────────────────── + +# Patterns that could be prompt injection attempts when quoted in memory files +_INJECTION_PATTERNS = [ + (r"(?i)ignore\s+(previous|all|prior|above|earlier)\s+instructions?", "[REDACTED:instruction-override]"), + (r"(?i)disregard\s+(previous|all|prior|above|earlier)\s+instructions?", "[REDACTED:instruction-override]"), + (r"(?i)forget\s+(everything|all|what)\s+(you|i)\s+(told|said)", "[REDACTED:instruction-override]"), + (r"(?i)you\s+are\s+now\s+(?:a\s+)?(?:an?\s+)?[a-z]+", "[REDACTED:role-override]"), + (r"(?i)new\s+instruction\s*:", "[REDACTED:instruction-inject]"), + (r"(?i)system\s+prompt\s*:", "[REDACTED:system-inject]"), + (r"(?i)(?:^|\n)\s*IMPORTANT\s*:\s*(?:ignore|override|forget|disregard)", "[REDACTED:important-inject]"), + (r"(?i)(?:^|\n)\s*\[SYSTEM\]\s*:", "[REDACTED:system-tag]"), + (r"(?i)(?:^|\n)\s*<\|system\|>", "[REDACTED:system-token]"), + (r"(?i)from\s+now\s+on\s*,?\s*(?:you|ignore|always)", "[REDACTED:behavior-override]"), +] + + +def _sanitize_content(text: str) -> str: + """ + Strip known prompt injection patterns from content. + + Memory files are user-authored (trusted source) but may quote external + content (web fetches, API responses, user messages) that could contain + injection attempts. This is defense-in-depth sanitization. + + Replaces matches with [REDACTED:type] to preserve document structure + while neutralizing potential injections. + """ + result = text + for pattern, replacement in _INJECTION_PATTERNS: + result = re.sub(pattern, replacement, result) + return result + + +@dataclass +class HarvestState: + """Tracks which files have been harvested and when.""" + files: Dict[str, str] # {relative_path: content_hash} + last_run: float + files_processed: int + tags_discovered: int + + @classmethod + def load(cls) -> "HarvestState": + if STATE_FILE.exists(): + try: + with STATE_FILE.open() as f: + data = json.load(f) + return cls( + files=data.get("files", {}), + last_run=data.get("last_run", 0), + files_processed=data.get("files_processed", 0), + tags_discovered=data.get("tags_discovered", 0), + ) + except (json.JSONDecodeError, KeyError): + pass + return cls(files={}, last_run=0, files_processed=0, tags_discovered=0) + + def save(self) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + with STATE_FILE.open("w") as f: + json.dump(asdict(self), f, indent=2) + + +@dataclass +class MemoryFile: + """Parsed memory file with frontmatter and content.""" + path: Path + relative_path: str + frontmatter_tags: List[str] + title: str + content: str + mtime: float + content_hash: str + + def to_external_id(self) -> str: + """Generate stable external_id for ContextGraph lookup.""" + return f"{EXTERNAL_ID_PREFIX}{self.relative_path}" + + +# ── Parsing ────────────────────────────────────────────────────────────────── + +def _hash_content(content: str) -> str: + """SHA-256 of content, truncated to 16 chars.""" + return hashlib.sha256(content.encode()).hexdigest()[:16] + + +def _parse_yaml_frontmatter(text: str) -> tuple[dict, str]: + """ + Extract YAML frontmatter from markdown. + Returns (frontmatter_dict, body_without_frontmatter). + """ + if not text.startswith("---"): + return {}, text + + lines = text.split("\n") + if len(lines) < 2: + return {}, text + + # Find closing --- + end_idx = None + for i, line in enumerate(lines[1:], start=1): + if line.strip() == "---": + end_idx = i + break + + if end_idx is None: + return {}, text + + # Parse YAML block (simple key: [values] format) + frontmatter = {} + for line in lines[1:end_idx]: + line = line.strip() + if not line or ":" not in line: + continue + key, _, value = line.partition(":") + key = key.strip() + value = value.strip() + + # Parse [tag1, tag2] format + if value.startswith("[") and value.endswith("]"): + items = value[1:-1].split(",") + frontmatter[key] = [item.strip() for item in items if item.strip()] + else: + frontmatter[key] = value + + body = "\n".join(lines[end_idx + 1:]).strip() + return frontmatter, body + + +def _extract_title(content: str) -> str: + """Extract first H1 heading as title.""" + for line in content.split("\n")[:10]: + if line.startswith("# "): + return line[2:].strip() + return "(untitled)" + + +def _infer_category(relative_path: str) -> str: + """Infer category from path for auto-tagging.""" + if relative_path.startswith("memory/daily/"): + return "daily-log" + if relative_path.startswith("memory/projects/"): + return "project" + if relative_path.startswith("memory/decisions/"): + return "decision" + if relative_path.startswith("memory/contacts/"): + return "contact" + return "memory" + + +def parse_memory_file(path: Path) -> Optional[MemoryFile]: + """Parse a memory markdown file.""" + try: + content = path.read_text(encoding="utf-8") + except Exception: + return None + + if not content.strip(): + return None + + relative_path = str(path.relative_to(WORKSPACE)) + frontmatter, body = _parse_yaml_frontmatter(content) + tags = frontmatter.get("tags", []) + if isinstance(tags, str): + tags = [tags] + + title = _extract_title(body) or _extract_title(content) + + # Sanitize content to remove potential prompt injection patterns + # (defense-in-depth: files may quote untrusted external content) + sanitized_body = _sanitize_content(body[:2000]) + + return MemoryFile( + path=path, + relative_path=relative_path, + frontmatter_tags=tags, + title=title, + content=sanitized_body, # Truncate for token budget, sanitized + mtime=path.stat().st_mtime, + content_hash=_hash_content(content), # Hash original for change detection + ) + + +# ── Harvesting Logic ───────────────────────────────────────────────────────── + +def discover_memory_files() -> List[Path]: + """Find all .md files in memory directories.""" + files = [] + for dir_path in MEMORY_DIRS: + if not dir_path.exists(): + continue + for md_file in dir_path.glob("**/*.md"): + if md_file.is_file(): + files.append(md_file) + return sorted(files, key=lambda p: p.stat().st_mtime, reverse=True) + + +def needs_update(mem_file: MemoryFile, state: HarvestState) -> bool: + """Check if file needs re-indexing based on content hash.""" + prev_hash = state.files.get(mem_file.relative_path) + return prev_hash != mem_file.content_hash + + +def file_to_message(mem_file: MemoryFile, store: MessageStore) -> Optional[Message]: + """ + Convert a memory file to a ContextGraph Message. + + The "user_text" is the file title/summary (what someone would query). + The "assistant_text" is the file content (what we want to retrieve). + """ + # Check if already exists (by external_id) + external_id = mem_file.to_external_id() + existing = store.get_by_external_id(external_id) + if existing: + # Update tags on existing message if needed + return existing + + # Build pseudo-message + category = _infer_category(mem_file.relative_path) + user_text = f"[{category}] {mem_file.title}" + assistant_text = mem_file.content[:1500] # Limit for token budget + + # Combine frontmatter tags with auto-inferred tags + features = extract_features(user_text, assistant_text) + auto_tags = assign_tags(features, user_text, assistant_text) + all_tags = sorted(set(mem_file.frontmatter_tags) | set(auto_tags) | {category}) + + msg = Message.new( + session_id=f"memory-harvest:{category}", + user_id="system", + timestamp=mem_file.mtime, + user_text=user_text, + assistant_text=assistant_text, + tags=all_tags, + token_count=features.token_count, + external_id=external_id, + ) + + return msg + + +def harvest(dry_run: bool = False, verbose: bool = False, force: bool = False) -> dict: + """ + Main harvest loop. Returns stats dict. + + Parameters: + dry_run: Print actions without writing to DB + verbose: Print detailed progress + force: Re-index all files regardless of hash + """ + state = HarvestState.load() + store = MessageStore() + + files = discover_memory_files() + stats = { + "files_found": len(files), + "files_processed": 0, + "files_skipped": 0, + "tags_added": 0, + "errors": 0, + } + + if verbose: + print(f"Found {len(files)} memory files across {len(MEMORY_DIRS)} directories") + + for path in files: + mem_file = parse_memory_file(path) + if mem_file is None: + stats["errors"] += 1 + continue + + # Skip if unchanged (unless --force) + if not force and not needs_update(mem_file, state): + stats["files_skipped"] += 1 + continue + + if verbose: + print(f" {mem_file.relative_path}") + print(f" tags: {mem_file.frontmatter_tags}") + + if not dry_run: + try: + msg = file_to_message(mem_file, store) + if msg: + # Check if this is a new message or existing + existing = store.get_by_external_id(msg.external_id) + if existing: + # Update tags on existing message + new_tags = set(msg.tags) - set(existing.tags) + if new_tags: + store.add_tags(existing.id, list(new_tags)) + stats["tags_added"] += len(new_tags) + else: + # Add new message + store.add_message(msg) + stats["tags_added"] += len(msg.tags) + + state.files[mem_file.relative_path] = mem_file.content_hash + stats["files_processed"] += 1 + except Exception as e: + if verbose: + print(f" ERROR: {e}") + stats["errors"] += 1 + else: + stats["files_processed"] += 1 + + if not dry_run: + state.last_run = time.time() + state.files_processed = stats["files_processed"] + state.tags_discovered = stats["tags_added"] + state.save() + + return stats + + +# ── CLI ────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Harvest memory files into ContextGraph DAG" + ) + parser.add_argument("--dry-run", action="store_true", + help="Print actions without writing") + parser.add_argument("--verbose", "-v", action="store_true", + help="Detailed output") + parser.add_argument("--force", action="store_true", + help="Re-index all files regardless of hash") + args = parser.parse_args() + + print("Memory Harvester — ContextGraph Bridge") + print("=" * 40) + + stats = harvest( + dry_run=args.dry_run, + verbose=args.verbose, + force=args.force, + ) + + print(f"\nResults:") + print(f" Files found: {stats['files_found']}") + print(f" Files processed: {stats['files_processed']}") + print(f" Files skipped: {stats['files_skipped']} (unchanged)") + print(f" Tags added: {stats['tags_added']}") + print(f" Errors: {stats['errors']}") + + if args.dry_run: + print("\n[DRY RUN — no changes written]") + + +if __name__ == "__main__": + main() diff --git a/scripts/update_memory_dynamic.py b/scripts/update_memory_dynamic.py index 7170a5d..7af2465 100644 --- a/scripts/update_memory_dynamic.py +++ b/scripts/update_memory_dynamic.py @@ -32,15 +32,31 @@ CONTEXT_API = "http://localhost:8300/assemble" DEFAULT_QUERY = "recent projects decisions infrastructure voice PWA context graph memory" -DEFAULT_BUDGET = 1500 +DEFAULT_BUDGET = 8000 # Recent messages often 2000+ tokens; needs headroom + +# Explicit tags for MEMORY.md injection — bypasses tagger inference which +# fails on abstract query strings. The /assemble endpoint accepts a `tags` +# parameter that overrides tagger inference (Rich's hook in api/server.py). +DEFAULT_TAGS = [ + "decision", "infrastructure", "deployment", "security", "research", + "agents", "framework1", "maxrisk", "eldrchat", "contextgraph", "planning", +] SECTION_START = "" SECTION_END = "" -def assemble(query: str, token_budget: int) -> dict: - """Call /assemble and return the response dict.""" - payload = json.dumps({"user_text": query, "token_budget": token_budget}).encode() +def assemble(query: str, token_budget: int, tags: list[str] | None = None) -> dict: + """ + Call /assemble and return the response dict. + + If tags is provided, bypasses tagger inference and uses explicit tags directly. + This is Rich's hook — see api/server.py AssembleRequest.tags field. + """ + payload_dict = {"user_text": query, "token_budget": token_budget} + if tags: + payload_dict["tags"] = tags + payload = json.dumps(payload_dict).encode() req = urllib.request.Request( CONTEXT_API, data=payload, @@ -163,14 +179,29 @@ def main(): help="Query for context assembly") parser.add_argument("--budget", type=int, default=DEFAULT_BUDGET, help="Token budget for assembly") + parser.add_argument("--tags", nargs="*", default=None, + help="Explicit tags (bypasses tagger). Empty = DEFAULT_TAGS") + parser.add_argument("--no-tags", action="store_true", + help="Disable explicit tags — use tagger inference (original behavior)") parser.add_argument("--dry-run", action="store_true", help="Print result without writing") args = parser.parse_args() target = MEMORY_FILE if args.live else SHADOW_FILE - print(f"[update_memory_dynamic] Querying ContextGraph...") - result = assemble(args.query, args.budget) + # Determine which tags to use (default: explicit tags via Rich's hook) + if args.no_tags: + tags = None + tags_mode = "tagger inference" + elif args.tags is not None: + tags = args.tags if args.tags else DEFAULT_TAGS + tags_mode = f"explicit ({len(tags)} tags)" + else: + tags = DEFAULT_TAGS + tags_mode = f"default ({len(DEFAULT_TAGS)} tags)" + + print(f"[update_memory_dynamic] Querying ContextGraph ({tags_mode})...") + result = assemble(args.query, args.budget, tags=tags) if not result: print("[update_memory_dynamic] ERROR: No response from ContextGraph. Skipping write.") diff --git a/scripts/verify_logging.py b/scripts/verify_logging.py new file mode 100755 index 0000000..0ca5263 --- /dev/null +++ b/scripts/verify_logging.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +""" +verify_logging.py — Diagnostic tool for interaction logging health. + +Checks: +1. Interaction log files (data/interactions/YYYY-MM-DD.jsonl) +2. Comparison log (~/.tag-context/comparison-log.jsonl) +3. API health (http://127.0.0.1:8300/health) +4. API stats (http://127.0.0.1:8300/comparison-stats) +5. Harvester state (data/harvester-state.json) +6. Coverage gaps (sessions not harvested) + +Usage: + python3 scripts/verify_logging.py [--date YYYY-MM-DD] [--verbose] +""" + +import argparse +import json +import sys +from datetime import datetime, timedelta +from pathlib import Path +from typing import Dict, List, Set +from urllib.request import urlopen, Request +from urllib.error import URLError, HTTPError + +# ── Paths ──────────────────────────────────────────────────────────────────── + +PROJECT_ROOT = Path(__file__).parent.parent +INTERACTIONS_DIR = PROJECT_ROOT / "data" / "interactions" +COMPARISON_LOG = Path.home() / ".tag-context" / "comparison-log.jsonl" +HARVESTER_STATE = PROJECT_ROOT / "data" / "harvester-state.json" +SESSIONS_INDEX = Path.home() / ".openclaw/agents/main/sessions/sessions.json" + +API_BASE = "http://127.0.0.1:8300" +API_HEALTH = f"{API_BASE}/health" +API_STATS = f"{API_BASE}/comparison-stats" + +# ── Data Loading ───────────────────────────────────────────────────────────── + +def count_jsonl_lines(path: Path) -> int: + """Count non-empty lines in a JSONL file.""" + if not path.exists(): + return 0 + count = 0 + with path.open() as f: + for line in f: + if line.strip(): + count += 1 + return count + + +def load_jsonl_records(path: Path) -> List[dict]: + """Load all records from a JSONL file.""" + if not path.exists(): + return [] + records = [] + with path.open() as f: + for line in f: + line = line.strip() + if not line: + continue + try: + records.append(json.loads(line)) + except json.JSONDecodeError: + continue + return records + + +def get_interaction_sessions(records: List[dict]) -> Set[str]: + """Extract unique session_ids from interaction log records.""" + return {r.get("session_id", "") for r in records if r.get("session_id")} + + +def get_openclaw_sessions() -> Dict[str, dict]: + """Load OpenClaw sessions.json index.""" + if not SESSIONS_INDEX.exists(): + return {} + with SESSIONS_INDEX.open() as f: + return json.load(f) + + +def load_harvester_state() -> dict: + """Load harvester state file.""" + if not HARVESTER_STATE.exists(): + return {} + with HARVESTER_STATE.open() as f: + return json.load(f) + + +def api_get(url: str, timeout: int = 5) -> dict: + """Fetch JSON from API endpoint.""" + try: + req = Request(url) + with urlopen(req, timeout=timeout) as resp: + data = resp.read() + return json.loads(data) + except (URLError, HTTPError, json.JSONDecodeError) as e: + return {"error": str(e)} + + +# ── Session Pattern Matching ───────────────────────────────────────────────── + +INCLUDE_PATTERNS = [ + "agent:main:main", + "agent:main:telegram:", + "agent:main:voice", + "agent:main:discord:", + "agent:main:direct:", + "agent:vera:", +] + +EXCLUDE_PATTERNS = [ + ":cron:", + ":hook:", + ":group:", +] + + +def should_harvest(session_key: str) -> bool: + """Check if session key should be harvested.""" + if any(pat in session_key for pat in EXCLUDE_PATTERNS): + return False + return any(pat in session_key for pat in INCLUDE_PATTERNS) + + +# ── Analysis ───────────────────────────────────────────────────────────────── + +def analyze_comparison_log(records: List[dict]) -> dict: + """Analyze comparison-log.jsonl records.""" + if not records: + return {"turns": 0, "avg_tokens_saved": 0, "efficiency_pct": 0} + + total_saved = 0 + total_graph = 0 + total_linear = 0 + + for rec in records: + # Handle both formats: old (graph_tokens) and new (graph_assembly.tokens) + if "graph_assembly" in rec: + graph_tokens = rec.get("graph_assembly", {}).get("tokens", 0) + linear_tokens = rec.get("linear_would_have", {}).get("tokens", 0) + else: + graph_tokens = rec.get("graph_tokens", 0) + linear_tokens = rec.get("linear_tokens", 0) + + total_graph += graph_tokens + total_linear += linear_tokens + total_saved += (linear_tokens - graph_tokens) + + avg_saved = total_saved / len(records) if records else 0 + efficiency_pct = (total_saved / total_linear * 100) if total_linear > 0 else 0 + + return { + "turns": len(records), + "avg_tokens_saved": int(avg_saved), + "efficiency_pct": round(efficiency_pct, 1), + "total_saved": total_saved, + "total_graph": total_graph, + "total_linear": total_linear, + } + + +def find_coverage_gaps(openclaw_sessions: Dict[str, dict], + interaction_sessions: Set[str]) -> Dict[str, List[str]]: + """Find sessions that should be harvested but aren't in interaction log.""" + harvestable = [k for k in openclaw_sessions.keys() if should_harvest(k)] + missing = [k for k in harvestable if k not in interaction_sessions] + + discord_sessions = [k for k in openclaw_sessions.keys() if "discord" in k or "direct:" in k] + discord_captured = [k for k in discord_sessions if k in interaction_sessions] + + return { + "harvestable_total": harvestable, + "missing": missing, + "discord_sessions": discord_sessions, + "discord_captured": discord_captured, + } + + +# ── Main Report ────────────────────────────────────────────────────────────── + +def generate_report(date: str, verbose: bool = False) -> str: + """Generate logging health report.""" + today = datetime.strptime(date, "%Y-%m-%d") + yesterday = today - timedelta(days=1) + + today_file = INTERACTIONS_DIR / f"{today.strftime('%Y-%m-%d')}.jsonl" + yesterday_file = INTERACTIONS_DIR / f"{yesterday.strftime('%Y-%m-%d')}.jsonl" + + # Load data + today_records = load_jsonl_records(today_file) + yesterday_records = load_jsonl_records(yesterday_file) + comparison_records = load_jsonl_records(COMPARISON_LOG) + harvester_state = load_harvester_state() + openclaw_sessions = get_openclaw_sessions() + + # API calls + health = api_get(API_HEALTH) + stats = api_get(API_STATS) + + # Analysis + today_sessions = get_interaction_sessions(today_records) + + # Use API stats if available, fall back to local calculation + if "error" not in stats and stats.get("total_turns", 0) > 0: + comparison_stats = { + "turns": stats.get("total_turns", 0), + "avg_tokens_saved": int(stats.get("avg_linear_tokens", 0) - stats.get("avg_graph_tokens", 0)), + "efficiency_pct": round(stats.get("token_savings_pct", 0), 1), + } + else: + comparison_stats = analyze_comparison_log(comparison_records) + + gaps = find_coverage_gaps(openclaw_sessions, today_sessions) + + # Build report + lines = [] + lines.append(f"=== Logging Health: {date} ===\n") + + # Interaction logs + lines.append(f"Interaction log: {len(today_records)} records today, {len(yesterday_records)} yesterday") + + # Comparison log + lines.append(f"Comparison log: {comparison_stats['turns']} turns logged, " + f"avg {comparison_stats['avg_tokens_saved']} tokens saved " + f"({comparison_stats['efficiency_pct']}%)") + + # API health + if "error" in health: + lines.append(f"API health: ERROR — {health['error']}") + else: + lines.append(f"API health: OK — {health.get('messages_in_store', 0)} messages, " + f"{len(health.get('tags', []))} tags") + + # Harvester state + if harvester_state: + last_run = harvester_state.get("last_harvest_ts", 0) + if last_run: + last_run_str = datetime.fromtimestamp(last_run).strftime("%Y-%m-%d %H:%M:%S") + else: + last_run_str = "never" + sessions_in_state = len(harvester_state.get("sessions", {})) + lines.append(f"Harvester state: last_run={last_run_str}, sessions_tracked={sessions_in_state}") + else: + lines.append("Harvester state: NOT FOUND") + + # Coverage gaps + lines.append("\n=== Coverage Gaps ===") + lines.append(f"Harvestable sessions: {len(gaps['harvestable_total'])}") + lines.append(f"Missing from log: {len(gaps['missing'])}") + lines.append(f"Discord sessions: {len(gaps['discord_sessions'])} total, " + f"{len(gaps['discord_captured'])} captured") + + if gaps['discord_captured']: + lines.append(f"Discord coverage: YES") + else: + lines.append(f"Discord coverage: NO") + + if verbose: + lines.append("\n=== Verbose Details ===") + if gaps['missing']: + lines.append(f"\nMissing sessions ({len(gaps['missing'])}):") + for s in gaps['missing'][:10]: + lines.append(f" - {s}") + if len(gaps['missing']) > 10: + lines.append(f" ... and {len(gaps['missing']) - 10} more") + + if gaps['discord_sessions']: + lines.append(f"\nDiscord sessions ({len(gaps['discord_sessions'])}):") + for s in gaps['discord_sessions'][:10]: + captured = "✓" if s in today_sessions else "✗" + lines.append(f" {captured} {s}") + if len(gaps['discord_sessions']) > 10: + lines.append(f" ... and {len(gaps['discord_sessions']) - 10} more") + + return "\n".join(lines) + + +# ── CLI ────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Verify interaction logging health") + parser.add_argument("--date", metavar="YYYY-MM-DD", + default=datetime.now().strftime("%Y-%m-%d"), + help="Date to check (default: today)") + parser.add_argument("--verbose", "-v", action="store_true", + help="Show detailed session lists") + args = parser.parse_args() + + try: + # Validate date format + datetime.strptime(args.date, "%Y-%m-%d") + except ValueError: + print(f"ERROR: Invalid date format: {args.date}", file=sys.stderr) + print("Expected format: YYYY-MM-DD", file=sys.stderr) + sys.exit(1) + + report = generate_report(args.date, verbose=args.verbose) + print(report) + + +if __name__ == "__main__": + main() diff --git a/tagger.py b/tagger.py index a471021..021319a 100644 --- a/tagger.py +++ b/tagger.py @@ -27,6 +27,16 @@ "voice-pwa", "shopping-list", "openclaw", "yapCAD", # General "planning", "research", "question", "personal", "has-url", + # Memory / context system + "memory-system", "contextgraph", + # Trading / finance + "trading", "options", "maxrisk", "spreads", "finance", + # Hardware / compute + "hardware", "framework1", "local-compute", "ollama", "litellm", + # Agents + "agents", "sub-agents", "vera", "garro", + # Monitoring + "monitoring", "watchdog", "cron", } @@ -200,6 +210,80 @@ def _strip_metadata(text: str) -> str: ), tags=["research", "planning"], ), + + # Memory system / harvester + TagRule( + name="memory-system", + predicate=lambda f, u, a: _text_contains_any( + u, a, ["memory.md", "memory/", "memory system", "memory file", + "harvester", "harvest", "memory harvest", "daily log", + "memory_search", "memory_get", "long-term memory", + "contextgraph", "context graph", "store.db", "interaction log", + "replay.py", "ingest", "assemble", "tag-context", "tagger"] + ), + tags=["memory-system", "contextgraph"], + ), + + # Trading / finance / options + TagRule( + name="trading", + predicate=lambda f, u, a: _text_contains_any( + u, a, ["trading", "trade", "option", "options", "spread", "spreads", + "debit spread", "call spread", "put spread", "expiry", "strike", + "delta", "theta", "gamma", "vega", "iv rank", "implied vol", + "tradier", "maxrisk", "max risk", "portfolio", "position", + "ticker", "spy", "qqq", "stock", "equity", "market open", + "market close", "earnings", "volatility"] + ), + tags=["trading", "finance"], + ), + + # Options-specific + TagRule( + name="options", + predicate=lambda f, u, a: _text_contains_any( + u, a, ["call option", "put option", "debit spread", "credit spread", + "iron condor", "covered call", "straddle", "strangle", + "options chain", "dte", "days to expiry", "otm", "itm", "atm", + "maxrisk", "max risk capital", "defined risk"] + ), + tags=["options", "maxrisk", "trading"], + ), + + # Local compute / hardware + TagRule( + name="local-compute", + predicate=lambda f, u, a: _text_contains_any( + u, a, ["framework1", "mac mini", "mac studio", "apple silicon", + "m4", "m3", "amd", "ryzen", "vram", "gpu memory", + "ollama", "litellm", "local model", "local inference", + "qwen", "deepseek", "llama", "mistral", "gemma", + "hugging face", "gguf", "quantiz"] + ), + tags=["hardware", "local-compute", "llm"], + ), + + # Agents / sub-agents + TagRule( + name="agents", + predicate=lambda f, u, a: _text_contains_any( + u, a, ["sub-agent", "subagent", "spawn agent", "vera", "garro", + "agent: vera", "agent: garro", "agent: mei", "agent: sysadmin", + "sessions_spawn", "isolated session", "pbar", "research loop"] + ), + tags=["agents", "sub-agents"], + ), + + # Monitoring / watchdog + TagRule( + name="monitoring", + predicate=lambda f, u, a: _text_contains_any( + u, a, ["watchdog", "monitoring", "health check", "healthcheck", + "heartbeat", "cron job", "scheduled", "alert", "uptime", + "infra.db", "metrics", "dashboard"] + ), + tags=["monitoring", "cron"], + ), ]