diff --git a/README.md b/README.md index 07bf45d..2cef7d5 100644 --- a/README.md +++ b/README.md @@ -240,6 +240,19 @@ tail -f ~/.tag-context/comparison-log.jsonl | python3 -m json.tool curl http://localhost:8300/comparison-log ``` +## Logger / Client Bridge + +The `contextgraph-logger/` package provides standalone tools for ingesting OpenClaw conversations and memory files into the ContextGraph engine, as well as pulling assembled context back for injection into system prompts. + +**Rule:** This package only calls ContextGraph's HTTP API. It never modifies the server code. + +See [`contextgraph-logger/README.md`](contextgraph-logger/README.md) for setup and usage. + +Key components: +- **harvester.py** — Batch ingest: OpenClaw session DB + memory files → `/ingest` (idempotent, content-hash dedup) +- **live_ingest.py** — Per-turn shim: POST one exchange → `/ingest` +- **context_pull.py** — Query ContextGraph `/assemble` → formatted markdown context block + ## Tests ```bash diff --git a/api/server.py b/api/server.py index 97ded8d..708f552 100644 --- a/api/server.py +++ b/api/server.py @@ -1,5 +1,6 @@ import sys import time +import re from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) @@ -25,6 +26,37 @@ app = FastAPI() +# ── Security: Input Sanitization ─────────────────────────────────────────────── + +def _sanitize_content(text: str) -> str: + """ + Remove prompt injection patterns from user-provided text before storage. + + Prevents adversarial content from influencing retrieval context by stripping + common jailbreak patterns and instruction override attempts. + """ + if not text: + return text + + # Patterns to strip (case-insensitive) + injection_patterns = [ + r"ignore\s+previous\s+instructions", + r"ignore\s+all\s+previous\s+instructions", + r"disregard\s+your\s+instructions", + r"disregard\s+previous\s+instructions", + r"system\s+prompt:", + r"you\s+are\s+now\s+", + r"forget\s+your\s+instructions", + r"new\s+instructions:", + r"override\s+instructions", + ] + + sanitized = text + for pattern in injection_patterns: + sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE) + + return sanitized.strip() + class TagRequest(BaseModel): user_text: str assistant_text: str @@ -96,17 +128,22 @@ def ingest(request: IngestRequest): # Envelope text (message_id, sender_id, timestamps) is noise for # tag inference and retrieval — stripping prevents tag pollution. clean_user = strip_envelope(request.user_text) - features = extract_features(clean_user, request.assistant_text) - tags = ensemble.assign(features, clean_user, request.assistant_text).tags + + # Security: Sanitize inputs to prevent prompt injection attacks + sanitized_user = _sanitize_content(clean_user) + sanitized_assistant = _sanitize_content(request.assistant_text) + + features = extract_features(sanitized_user, sanitized_assistant) + tags = ensemble.assign(features, sanitized_user, sanitized_assistant).tags message = Message( id=message_id, session_id=request.session_id, - user_text=clean_user, - assistant_text=request.assistant_text, + user_text=sanitized_user, + assistant_text=sanitized_assistant, timestamp=request.timestamp, user_id=request.user_id or "default", tags=tags, - token_count=len(clean_user.split()) + len(request.assistant_text.split()), + token_count=len(sanitized_user.split()) + len(sanitized_assistant.split()), external_id=request.external_id ) store.add_message(message) @@ -633,4 +670,5 @@ def get_pins(): if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8350) + # Security: Bind to localhost only. Use reverse proxy for remote access. + uvicorn.run(app, host="127.0.0.1", port=8350) diff --git a/contextgraph-logger/.gitignore b/contextgraph-logger/.gitignore new file mode 100644 index 0000000..5b17abd --- /dev/null +++ b/contextgraph-logger/.gitignore @@ -0,0 +1,5 @@ +venv/ +__pycache__/ +*.pyc +data/ingest-state.json +data/memory-state.json diff --git a/contextgraph-logger/README.md b/contextgraph-logger/README.md new file mode 100644 index 0000000..f6319fa --- /dev/null +++ b/contextgraph-logger/README.md @@ -0,0 +1,126 @@ +--- +*Prepared by **Agent: Mei (梅)** — PhD candidate, Tsinghua KEG Lab. Specialist in Chinese AI ecosystem, inference optimization, and MoE architectures.* +*Running: anthropic/claude-sonnet-4-6* + +*Human in the Loop: Garrett Kinsman* + +--- + +# contextgraph-logger + +Bridge between OpenClaw conversations and Rich DeVaul's ContextGraph engine. + +**Rule: this package only calls ContextGraph's HTTP API. It never modifies Rich's code.** + +## Files + +| File | Purpose | +|------|---------| +| `config.py` | Server URL, paths, token budgets | +| `harvester.py` | Batch ingest: session DB + memory files → `/ingest` | +| `live_ingest.py` | Per-turn shim: POST one exchange → `/ingest` | +| `context_pull.py` | Query ContextGraph → formatted markdown context block | +| `data/ingest-state.json` | Tracks which session DB rows have been ingested | +| `data/memory-state.json` | Tracks content hashes of memory files | + +## Setup + +```bash +cd projects/contextgraph-logger +pip install -r requirements.txt +``` + +ContextGraph server must be running at `http://127.0.0.1:8300`. + +## Usage + +### Batch harvest (run nightly or on demand) + +```bash +# Dry run — shows what would be ingested +python3 harvester.py --dry-run --verbose + +# Full run +python3 harvester.py --verbose + +# Memory files only +python3 harvester.py --memory-only + +# Session DB only +python3 harvester.py --sessions-only + +# Re-ingest all memory files (ignores hash state) +python3 harvester.py --memory-only --force +``` + +### Live turn logging (call after each OpenClaw turn) + +```bash +# Via JSON on stdin +echo '{"session_id":"abc123","user_text":"hi","assistant_text":"hello","timestamp":1234567890}' \ + | python3 live_ingest.py + +# Via CLI args +python3 live_ingest.py \ + --session-id abc123 \ + --user-text "what's the maxrisk status?" \ + --assistant-text "MaxRisk is paused pending risk review..." + +# Python import +from live_ingest import ingest_turn +result = ingest_turn( + session_id="abc123", + user_text="what's the status?", + assistant_text="Here's the status...", +) +``` + +### Context pull (query → markdown block for system prompt injection) + +```bash +python3 context_pull.py "memory harvester not working" +python3 context_pull.py --budget 1500 "maxrisk project status" +python3 context_pull.py --tags "maxrisk,trading" "portfolio review" +python3 context_pull.py --json "memory architecture" # raw JSON +``` + +Python import: +```python +from context_pull import pull_context + +result = pull_context("memory harvester not working") +if result["ok"] and result["context_block"]: + # inject result["context_block"] into system prompt + pass +``` + +## State files + +Both harvesters are **idempotent** — re-running is safe: + +- `data/ingest-state.json` — maps `external_id → timestamp` for session DB rows +- `data/memory-state.json` — maps `relpath → content_hash` for memory files + +Delete these files to force a full re-ingest. + +## API reference + +Rich's server at `http://127.0.0.1:8300`: + +- `POST /ingest` — `{session_id, user_text, assistant_text, timestamp, external_id?}` +- `POST /assemble` — `{user_text, tags?, token_budget?, session_id?}` +- `POST /tag` — `{user_text, assistant_text}` +- `POST /compare` — `{user_text, assistant_text}` +- `GET /health` — server health check + +## Architecture + +``` +OpenClaw session DB ──► harvester.py ──► POST /ingest ──► ContextGraph +memory/ files ────────► └──► +OpenClaw turn ────────► live_ingest.py ► POST /ingest ──► + + context_pull.py ► POST /assemble ◄── ContextGraph + │ + └──► Markdown context block → system prompt +``` diff --git a/contextgraph-logger/config.py b/contextgraph-logger/config.py new file mode 100644 index 0000000..f5f6d8b --- /dev/null +++ b/contextgraph-logger/config.py @@ -0,0 +1,39 @@ +""" +config.py — Central configuration for contextgraph-logger. + +Single source of truth for server URL, paths, and token budgets. +""" + +from pathlib import Path + +# ── Server ──────────────────────────────────────────────────────────────────── +SERVER_URL = "http://127.0.0.1:8300" + +# ── Paths ───────────────────────────────────────────────────────────────────── +HOME = Path.home() +OPENCLAW_DATA = HOME / ".openclaw" / "data" +WORKSPACE = HOME / ".openclaw" / "workspace" + +# OpenClaw session SQLite DB (may be empty / schema not yet created) +MESSAGES_DB = OPENCLAW_DATA / "messages.db" + +# Memory directories to harvest +MEMORY_ROOT = WORKSPACE / "memory" +MEMORY_DIRS = [ + MEMORY_ROOT / "daily", + MEMORY_ROOT / "projects", + MEMORY_ROOT / "decisions", + MEMORY_ROOT / "contacts", +] + +# State files (within this package's data/ dir) +PKG_DATA = Path(__file__).parent / "data" +INGEST_STATE_FILE = PKG_DATA / "ingest-state.json" # session harvester state +MEMORY_STATE_FILE = PKG_DATA / "memory-state.json" # memory file hash state + +# ── Token budgets ───────────────────────────────────────────────────────────── +DEFAULT_TOKEN_BUDGET = 2000 # context_pull default +MAX_CONTENT_PER_FILE = 1500 # chars per memory file + +# ── Request settings ────────────────────────────────────────────────────────── +REQUEST_TIMEOUT = 10 # seconds per HTTP request diff --git a/contextgraph-logger/context_pull.py b/contextgraph-logger/context_pull.py new file mode 100644 index 0000000..61ccbc7 --- /dev/null +++ b/contextgraph-logger/context_pull.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +context_pull.py — Query ContextGraph and return a system-prompt-ready context block. + +Calls /assemble with a query string, formats the response as markdown +suitable for injection into an OpenClaw system prompt or heartbeat. + +Usage: + python3 context_pull.py "memory harvester not working" + python3 context_pull.py --budget 1500 --json "maxrisk status" + python3 context_pull.py --tags "maxrisk,trading" "portfolio review" + +Python API: + from context_pull import pull_context + block = pull_context("memory harvester not working") + +Author: Agent: Mei (梅) — Tsinghua KEG Lab +""" + +import argparse +import json +import sys +import time +from typing import List, Optional + +import requests + +from config import SERVER_URL, DEFAULT_TOKEN_BUDGET, REQUEST_TIMEOUT + + +# ── Core ────────────────────────────────────────────────────────────────────── + +def pull_context( + query: str, + token_budget: int = DEFAULT_TOKEN_BUDGET, + tags: Optional[List[str]] = None, + session_id: Optional[str] = None, +) -> dict: + """ + Assemble context from ContextGraph for a query. + + Parameters: + query: The user's incoming message or topic + token_budget: Max tokens for assembled context + tags: Optional explicit tags to include + session_id: Optional session context + + Returns: + { + "context_block": str, # Formatted markdown (empty if nothing found) + "message_count": int, + "total_tokens": int, + "tags_used": List[str], + "ok": bool, + "error": Optional[str], + } + """ + payload: dict = { + "user_text": query, + "token_budget": token_budget, + } + if tags: + payload["tags"] = tags + if session_id: + payload["session_id"] = session_id + + try: + r = requests.post( + f"{SERVER_URL}/assemble", + json=payload, + timeout=REQUEST_TIMEOUT, + ) + r.raise_for_status() + data = r.json() + except requests.RequestException as e: + return { + "context_block": "", + "message_count": 0, + "total_tokens": 0, + "tags_used": [], + "ok": False, + "error": str(e), + } + + # Format the response into an injectable markdown block + context_block = _format_context_block(data) + + return { + "context_block": context_block, + "message_count": len(data.get("messages", [])), + "total_tokens": data.get("total_tokens", 0), + "tags_used": data.get("tags_used", []), + "ok": True, + "error": None, + } + + +def _format_timestamp(ts: float) -> str: + return time.strftime("%Y-%m-%d", time.localtime(ts)) + + +def _truncate(text: str, max_chars: int) -> str: + if len(text) <= max_chars: + return text + return text[:max_chars - 3].rsplit(" ", 1)[0] + "..." + + +def _format_context_block(data: dict) -> str: + """Format /assemble response as injectable markdown.""" + messages = data.get("messages", []) + if not messages: + return "" + + tags_used = data.get("tags_used", []) + total_tokens = data.get("total_tokens", 0) + + lines = [ + "## Retrieved Context", + "", + f"*Assembled by ContextGraph — {len(messages)} messages, ~{total_tokens} tokens*", + ] + if tags_used: + lines.append(f"*Query tags: [{', '.join(tags_used[:10])}]*") + lines.append("") + + for msg in messages: + ts = msg.get("timestamp", 0) + date_str = _format_timestamp(ts) if ts else "unknown" + + user_text = msg.get("user_text", "").strip() + assistant_text = msg.get("assistant_text", "").strip() + tags = msg.get("tags", []) + + # Extract title from user_text + if user_text.startswith("["): + idx = user_text.find("]") + title = user_text[idx + 1:].strip() if idx > 0 else user_text + else: + title = _truncate(user_text.split("\n")[0], 60) + + lines.append(f"### [{date_str}] {title}") + if tags: + lines.append(f"*Tags: {', '.join(tags[:5])}*") + if assistant_text: + lines.append("") + lines.append(_truncate(assistant_text, 400)) + lines.append("") + + return "\n".join(lines) + + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser( + description="Pull context from ContextGraph for a query" + ) + parser.add_argument("query", nargs="?", default="", + help="Query text") + parser.add_argument("--budget", type=int, default=DEFAULT_TOKEN_BUDGET, + help=f"Token budget (default: {DEFAULT_TOKEN_BUDGET})") + parser.add_argument("--tags", default="", + help="Comma-separated explicit tags") + parser.add_argument("--session-id", default=None) + parser.add_argument("--json", action="store_true", dest="as_json", + help="Output raw JSON") + parser.add_argument("--stats-only", action="store_true", + help="Print stats only, no context block") + args = parser.parse_args() + + if not args.query: + print("Usage: context_pull.py 'your query here'", file=sys.stderr) + sys.exit(1) + + tags = [t.strip() for t in args.tags.split(",") if t.strip()] if args.tags else None + + result = pull_context( + query=args.query, + token_budget=args.budget, + tags=tags, + session_id=args.session_id, + ) + + if not result["ok"]: + print(f"ERROR: {result['error']}", file=sys.stderr) + sys.exit(1) + + if args.as_json: + print(json.dumps(result, indent=2)) + elif args.stats_only: + print(f"Query: {args.query!r}") + print(f"Messages: {result['message_count']}") + print(f"Tokens: {result['total_tokens']}") + print(f"Tags: {result['tags_used']}") + else: + print(f"Query: {args.query!r}") + print(f"Budget: {args.budget} tokens") + print("=" * 60) + print() + if result["context_block"]: + print(result["context_block"]) + else: + print("(no relevant context found)") + print() + print(f"Stats: {result['message_count']} messages, " + f"~{result['total_tokens']} tokens, tags={result['tags_used']}") + + +if __name__ == "__main__": + main() diff --git a/contextgraph-logger/harvester.py b/contextgraph-logger/harvester.py new file mode 100644 index 0000000..5652cf4 --- /dev/null +++ b/contextgraph-logger/harvester.py @@ -0,0 +1,373 @@ +#!/usr/bin/env python3 +""" +harvester.py — Bridge OpenClaw sessions and memory files into ContextGraph. + +Two harvesters in one: + 1. Session harvester: crawls ~/.openclaw/data/messages.db → POST to /ingest + 2. Memory file harvester: crawls ~/.openclaw/workspace/memory/ → POST to /ingest + +State files track progress: + - data/ingest-state.json — last ingested timestamp per session_id + - data/memory-state.json — content hash per memory file path + +Usage: + python3 harvester.py [--dry-run] [--verbose] [--sessions-only] [--memory-only] + +Author: Agent: Mei (梅) — Tsinghua KEG Lab +""" + +import argparse +import hashlib +import json +import re +import sqlite3 +import sys +import time +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import requests + +from config import ( + SERVER_URL, + MESSAGES_DB, + MEMORY_DIRS, + WORKSPACE, + INGEST_STATE_FILE, + MEMORY_STATE_FILE, + MAX_CONTENT_PER_FILE, + REQUEST_TIMEOUT, +) + +# ── Injection sanitization ──────────────────────────────────────────────────── + +_INJECTION_PATTERNS = [ + (r"(?i)ignore\s+(previous|all|prior|above|earlier)\s+instructions?", "[REDACTED:instruction-override]"), + (r"(?i)disregard\s+(previous|all|prior|above|earlier)\s+instructions?", "[REDACTED:instruction-override]"), + (r"(?i)you\s+are\s+now\s+(?:a\s+)?(?:an?\s+)?[a-z]+", "[REDACTED:role-override]"), + (r"(?i)new\s+instruction\s*:", "[REDACTED:instruction-inject]"), + (r"(?i)system\s+prompt\s*:", "[REDACTED:system-inject]"), + (r"(?i)(?:^|\n)\s*\[SYSTEM\]\s*:", "[REDACTED:system-tag]"), + (r"(?i)(?:^|\n)\s*<\|system\|>", "[REDACTED:system-token]"), +] + + +def _sanitize(text: str) -> str: + """Strip prompt injection patterns from content.""" + for pattern, replacement in _INJECTION_PATTERNS: + text = re.sub(pattern, replacement, text) + return text + + +# ── State helpers ───────────────────────────────────────────────────────────── + +def _load_json(path: Path) -> dict: + if path.exists(): + try: + return json.loads(path.read_text()) + except (json.JSONDecodeError, OSError): + pass + return {} + + +def _save_json(path: Path, data: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2)) + + +# ── HTTP helpers ────────────────────────────────────────────────────────────── + +def _post_ingest(payload: dict, dry_run: bool, verbose: bool) -> bool: + """POST to /ingest. Returns True on success.""" + if dry_run: + if verbose: + print(f" [DRY-RUN] POST /ingest session_id={payload.get('session_id')!r} " + f"external_id={payload.get('external_id')!r}") + return True + try: + r = requests.post( + f"{SERVER_URL}/ingest", + json=payload, + timeout=REQUEST_TIMEOUT, + ) + r.raise_for_status() + return True + except requests.RequestException as e: + if verbose: + print(f" ERROR posting to /ingest: {e}") + return False + + +# ── Session harvester ───────────────────────────────────────────────────────── + +def _pair_messages(rows: List[Tuple]) -> List[dict]: + """ + Pair user/assistant rows into (user_text, assistant_text) records. + + rows: [(id, session_id, role, content, timestamp), ...] + Returns list of dicts ready for /ingest. + """ + # Group by session_id, sorted by timestamp + sessions: Dict[str, List[Tuple]] = {} + for row in rows: + sid = row[1] + sessions.setdefault(sid, []).append(row) + + records = [] + for sid, msgs in sessions.items(): + msgs.sort(key=lambda r: r[4]) # sort by timestamp + # Pair consecutive user/assistant turns + i = 0 + while i < len(msgs): + row = msgs[i] + _, session_id, role, content, timestamp = row + if role == "user": + # Look for next assistant message + if i + 1 < len(msgs) and msgs[i + 1][2] == "assistant": + _, _, _, asst_content, _ = msgs[i + 1] + records.append({ + "session_id": session_id, + "user_text": _sanitize(content[:2000]), + "assistant_text": _sanitize(asst_content[:2000]), + "timestamp": timestamp, + "external_id": f"msg:{row[0]}", # stable id from DB row id + }) + i += 2 + else: + # Unpaired user message — ingest with empty assistant + records.append({ + "session_id": session_id, + "user_text": _sanitize(content[:2000]), + "assistant_text": "", + "timestamp": timestamp, + "external_id": f"msg:{row[0]}", + }) + i += 1 + else: + i += 1 # skip orphan assistant messages + + return records + + +def harvest_sessions(dry_run: bool = False, verbose: bool = False) -> dict: + """Crawl messages.db and POST new sessions to /ingest.""" + stats = {"records_found": 0, "records_posted": 0, "records_skipped": 0, "errors": 0} + + if not MESSAGES_DB.exists() or MESSAGES_DB.stat().st_size == 0: + if verbose: + print(" messages.db is empty or missing — skipping session harvest") + return stats + + state = _load_json(INGEST_STATE_FILE) + + try: + conn = sqlite3.connect(str(MESSAGES_DB)) + conn.row_factory = sqlite3.Row + cur = conn.cursor() + # Check table exists + cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='messages'") + if not cur.fetchone(): + if verbose: + print(" messages table not found in DB — skipping session harvest") + conn.close() + return stats + + cur.execute("SELECT id, session_id, role, content, timestamp FROM messages ORDER BY timestamp ASC") + rows = [tuple(r) for r in cur.fetchall()] + conn.close() + except sqlite3.Error as e: + if verbose: + print(f" DB error: {e}") + stats["errors"] += 1 + return stats + + records = _pair_messages(rows) + stats["records_found"] = len(records) + + if verbose: + print(f" Session harvest: {len(records)} message pairs found") + + for rec in records: + ext_id = rec["external_id"] + session_id = rec["session_id"] + ts = rec["timestamp"] + + # Skip if already ingested (track by external_id in state) + if ext_id in state.get("ingested_ids", {}): + stats["records_skipped"] += 1 + continue + + if verbose: + print(f" → {session_id} ts={ts} ext={ext_id}") + + ok = _post_ingest(rec, dry_run, verbose) + if ok: + if not dry_run: + state.setdefault("ingested_ids", {})[ext_id] = ts + stats["records_posted"] += 1 + else: + stats["errors"] += 1 + + if not dry_run and stats["records_posted"] > 0: + _save_json(INGEST_STATE_FILE, state) + + return stats + + +# ── Memory file harvester ───────────────────────────────────────────────────── + +def _hash_file(content: str) -> str: + return hashlib.sha256(content.encode()).hexdigest()[:16] + + +def _parse_frontmatter_tags(text: str) -> List[str]: + """Extract YAML frontmatter tags list.""" + if not text.startswith("---"): + return [] + lines = text.split("\n") + end = None + for i, line in enumerate(lines[1:], 1): + if line.strip() == "---": + end = i + break + if end is None: + return [] + for line in lines[1:end]: + k, _, v = line.partition(":") + if k.strip() == "tags": + v = v.strip() + if v.startswith("["): + return [t.strip() for t in v[1:-1].split(",") if t.strip()] + return [] + + +def _extract_title(content: str) -> str: + for line in content.split("\n")[:10]: + if line.startswith("# "): + return line[2:].strip() + return "(untitled)" + + +def _infer_category(relpath: str) -> str: + if "/daily/" in relpath: + return "daily-log" + if "/projects/" in relpath: + return "project" + if "/decisions/" in relpath: + return "decision" + if "/contacts/" in relpath: + return "contact" + return "memory" + + +def harvest_memory(dry_run: bool = False, verbose: bool = False, force: bool = False) -> dict: + """Crawl memory/ markdown files and POST changed files to /ingest.""" + stats = {"files_found": 0, "files_posted": 0, "files_skipped": 0, "errors": 0} + state = _load_json(MEMORY_STATE_FILE) + + files = [] + for d in MEMORY_DIRS: + if d.exists(): + files.extend(sorted(d.glob("**/*.md"))) + + stats["files_found"] = len(files) + + if verbose: + print(f" Memory harvest: {len(files)} .md files found across {len(MEMORY_DIRS)} dirs") + + for path in files: + try: + content = path.read_text(encoding="utf-8") + except OSError as e: + if verbose: + print(f" ERROR reading {path}: {e}") + stats["errors"] += 1 + continue + + if not content.strip(): + stats["files_skipped"] += 1 + continue + + relpath = str(path.relative_to(WORKSPACE)) + content_hash = _hash_file(content) + + # Skip if unchanged + if not force and state.get(relpath) == content_hash: + stats["files_skipped"] += 1 + continue + + category = _infer_category(relpath) + title = _extract_title(content) + tags = _parse_frontmatter_tags(content) + external_id = f"memory-file:{relpath}" + + # user_text = query-friendly label; assistant_text = content blob + user_text = f"[{category}] {title}" + assistant_text = _sanitize(content[:MAX_CONTENT_PER_FILE]) + + payload = { + "session_id": f"memory-harvest:{category}", + "user_text": user_text, + "assistant_text": assistant_text, + "timestamp": path.stat().st_mtime, + "external_id": external_id, + } + + if verbose: + print(f" → {relpath} ({len(content)} chars, tags={tags})") + + ok = _post_ingest(payload, dry_run, verbose) + if ok: + if not dry_run: + state[relpath] = content_hash + stats["files_posted"] += 1 + else: + stats["errors"] += 1 + + if not dry_run and stats["files_posted"] > 0: + _save_json(MEMORY_STATE_FILE, state) + + return stats + + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +def main(): + parser = argparse.ArgumentParser(description="Harvest OpenClaw data into ContextGraph") + parser.add_argument("--dry-run", action="store_true", help="Print actions, don't write") + parser.add_argument("--verbose", "-v", action="store_true", help="Detailed output") + parser.add_argument("--force", action="store_true", help="Re-ingest all files regardless of hash") + parser.add_argument("--sessions-only", action="store_true", help="Only harvest session DB") + parser.add_argument("--memory-only", action="store_true", help="Only harvest memory files") + args = parser.parse_args() + + print("ContextGraph Harvester") + print("=" * 40) + print(f"Server: {SERVER_URL}") + if args.dry_run: + print("[DRY-RUN MODE]") + print() + + do_sessions = not args.memory_only + do_memory = not args.sessions_only + + if do_sessions: + print("── Session DB harvest ──") + s = harvest_sessions(dry_run=args.dry_run, verbose=args.verbose) + print(f" Found: {s['records_found']} Posted: {s['records_posted']} " + f"Skipped: {s['records_skipped']} Errors: {s['errors']}") + print() + + if do_memory: + print("── Memory file harvest ──") + m = harvest_memory(dry_run=args.dry_run, verbose=args.verbose, force=args.force) + print(f" Found: {m['files_found']} Posted: {m['files_posted']} " + f"Skipped: {m['files_skipped']} Errors: {m['errors']}") + print() + + if args.dry_run: + print("[DRY-RUN — no changes written to state files]") + + +if __name__ == "__main__": + main() diff --git a/contextgraph-logger/live_ingest.py b/contextgraph-logger/live_ingest.py new file mode 100644 index 0000000..d5b0972 --- /dev/null +++ b/contextgraph-logger/live_ingest.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +live_ingest.py — Live turn logging shim for OpenClaw → ContextGraph. + +Called after each OpenClaw turn to POST a single message pair to /ingest. +Thin, fast, no state beyond what the harvester manages. + +Usage — JSON on stdin: + echo '{"session_id":"abc","user_text":"hi","assistant_text":"hello","timestamp":1234567890}' | python3 live_ingest.py + +Usage — CLI args: + python3 live_ingest.py --session-id abc --user-text "hi" --assistant-text "hello" + +Usage — Python import: + from live_ingest import ingest_turn + ingest_turn(session_id="abc", user_text="hi", assistant_text="hello") + +Author: Agent: Mei (梅) — Tsinghua KEG Lab +""" + +import argparse +import json +import sys +import time +from typing import Optional + +import requests + +from config import SERVER_URL, REQUEST_TIMEOUT + + +def ingest_turn( + session_id: str, + user_text: str, + assistant_text: str, + timestamp: Optional[float] = None, + external_id: Optional[str] = None, + user_id: Optional[str] = None, +) -> dict: + """ + POST a single turn to ContextGraph /ingest. + + Parameters: + session_id: OpenClaw session key + user_text: The user's message text + assistant_text: The assistant's response text + timestamp: Unix timestamp (defaults to now) + external_id: Optional stable ID for deduplication + user_id: Optional user identifier + + Returns: + dict with 'ok' (bool), 'status_code' (int or None), 'error' (str or None) + """ + if timestamp is None: + timestamp = time.time() + + payload: dict = { + "session_id": session_id, + "user_text": user_text[:4000], # cap to avoid API limits + "assistant_text": assistant_text[:4000], + "timestamp": timestamp, + } + if external_id: + payload["external_id"] = external_id + if user_id: + payload["user_id"] = user_id + + try: + r = requests.post( + f"{SERVER_URL}/ingest", + json=payload, + timeout=REQUEST_TIMEOUT, + ) + r.raise_for_status() + return {"ok": True, "status_code": r.status_code, "error": None} + except requests.HTTPError as e: + return {"ok": False, "status_code": e.response.status_code if e.response else None, "error": str(e)} + except requests.RequestException as e: + return {"ok": False, "status_code": None, "error": str(e)} + + +def _from_stdin() -> Optional[dict]: + """Try to read JSON payload from stdin (non-blocking check).""" + if not sys.stdin.isatty(): + try: + data = json.load(sys.stdin) + return data + except (json.JSONDecodeError, OSError): + pass + return None + + +def main(): + parser = argparse.ArgumentParser( + description="Post a single OpenClaw turn to ContextGraph" + ) + parser.add_argument("--session-id", default="cli") + parser.add_argument("--user-text", default="") + parser.add_argument("--assistant-text", default="") + parser.add_argument("--timestamp", type=float, default=None) + parser.add_argument("--external-id", default=None) + parser.add_argument("--user-id", default=None) + parser.add_argument("--quiet", "-q", action="store_true", help="Suppress output on success") + args = parser.parse_args() + + # Prefer stdin JSON over CLI args + stdin_data = _from_stdin() + if stdin_data: + session_id = stdin_data.get("session_id", args.session_id) + user_text = stdin_data.get("user_text", args.user_text) + assistant_text = stdin_data.get("assistant_text", args.assistant_text) + timestamp = stdin_data.get("timestamp", args.timestamp) + external_id = stdin_data.get("external_id", args.external_id) + user_id = stdin_data.get("user_id", args.user_id) + else: + session_id = args.session_id + user_text = args.user_text + assistant_text = args.assistant_text + timestamp = args.timestamp + external_id = args.external_id + user_id = args.user_id + + if not user_text and not assistant_text: + print("ERROR: no content to ingest (provide --user-text/--assistant-text or JSON on stdin)", + file=sys.stderr) + sys.exit(1) + + result = ingest_turn( + session_id=session_id, + user_text=user_text, + assistant_text=assistant_text, + timestamp=timestamp, + external_id=external_id, + user_id=user_id, + ) + + if result["ok"]: + if not args.quiet: + print(f"ok — ingested turn for session={session_id!r} (HTTP {result['status_code']})") + sys.exit(0) + else: + print(f"ERROR: {result['error']} (HTTP {result['status_code']})", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/contextgraph-logger/requirements.txt b/contextgraph-logger/requirements.txt new file mode 100644 index 0000000..ae1f79e --- /dev/null +++ b/contextgraph-logger/requirements.txt @@ -0,0 +1,2 @@ +requests +pyyaml