diff --git a/.trajectories/active/traj_1773148075777_a710fb1b.json b/.trajectories/active/traj_1773148075777_a710fb1b.json deleted file mode 100644 index e1d957e53..000000000 --- a/.trajectories/active/traj_1773148075777_a710fb1b.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "id": "traj_1773148075777_a710fb1b", - "version": 1, - "task": { - "title": "test-force-release run #e2a54a64", - "source": { - "system": "workflow-runner", - "id": "e2a54a647d51b01453059568" - } - }, - "status": "active", - "startedAt": "2026-03-10T13:07:55.777Z", - "agents": [ - { - "name": "orchestrator", - "role": "workflow-runner", - "joinedAt": "2026-03-10T13:07:55.777Z" - }, - { - "name": "idle-worker", - "role": "specialist", - "joinedAt": "2026-03-10T13:07:59.239Z" - } - ], - "chapters": [ - { - "id": "ch_310b1c03", - "title": "Planning", - "agentName": "orchestrator", - "startedAt": "2026-03-10T13:07:55.777Z", - "events": [ - { - "ts": 1773148075777, - "type": "note", - "content": "Approach: 1-step sequential workflow — Parsed 1 steps, DAG validated, no cycles" - }, - { - "ts": 1773148079242, - "type": "note", - "content": "\"idle-step\": Just sit here", - "raw": { - "agent": "idle-worker" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/.trajectories/active/traj_1773150874252_08ec23ea.json b/.trajectories/active/traj_1773150874252_08ec23ea.json deleted file mode 100644 index 7995c42df..000000000 --- a/.trajectories/active/traj_1773150874252_08ec23ea.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "id": "traj_1773150874252_08ec23ea", - "version": 1, - "task": { - "title": "test run #ebaecc98", - "source": { - "system": "workflow-runner", - "id": "ebaecc98b1200ac4d1eaf96c" - } - }, - "status": "active", - "startedAt": "2026-03-10T13:54:34.252Z", - "agents": [ - { - "name": "orchestrator", - "role": "workflow-runner", - "joinedAt": "2026-03-10T13:54:34.252Z" - }, - { - "name": "coder", - "role": "specialist", - "joinedAt": "2026-03-10T13:54:38.105Z" - } - ], - "chapters": [ - { - "id": "ch_6a7dbabc", - "title": "Planning", - "agentName": "orchestrator", - "startedAt": "2026-03-10T13:54:34.252Z", - "events": [ - { - "ts": 1773150874252, - "type": "note", - "content": "Purpose: Simple test to verify PTY output streaming works end-to-end" - }, - { - "ts": 1773150874252, - "type": "note", - "content": "Approach: 1-step sequential workflow — Parsed 1 steps, DAG validated, no cycles" - }, - { - "ts": 1773150878108, - "type": "note", - "content": "\"generate-code\": Create a file at /tmp/pty-stream-test.ts with a simple TypeScript", - "raw": { - "agent": "coder" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/.trajectories/active/traj_1773678087586_feff434a.json b/.trajectories/active/traj_1773678087586_feff434a.json deleted file mode 100644 index 77d501be6..000000000 --- a/.trajectories/active/traj_1773678087586_feff434a.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "id": "traj_1773678087586_feff434a", - "version": 1, - "task": { - "title": "add-start-from-workflow", - "source": { - "system": "workflow-runner", - "id": "bb9820a0598047f101ef3fc6" - } - }, - "status": "active", - "startedAt": "2026-03-16T16:21:27.586Z", - "agents": [ - { - "name": "orchestrator", - "role": "workflow-runner", - "joinedAt": "2026-03-16T16:21:27.586Z" - }, - { - "name": "droid-impl", - "role": "specialist", - "joinedAt": "2026-03-16T16:21:31.370Z" - }, - { - "name": "claude-review", - "role": "reviewer", - "joinedAt": "2026-03-16T16:21:31.372Z" - } - ], - "chapters": [ - { - "id": "ch_2384168d", - "title": "Planning", - "agentName": "orchestrator", - "startedAt": "2026-03-16T16:21:27.586Z", - "events": [ - { - "ts": 1773678087586, - "type": "note", - "content": "Purpose: Add startFrom feature to workflow runner" - }, - { - "ts": 1773678087586, - "type": "note", - "content": "Approach: 2-step dag workflow — Parsed 2 steps, 1 dependent steps, DAG validated, no cycles" - } - ], - "endedAt": "2026-03-16T16:21:31.373Z" - }, - { - "id": "ch_d1ab31e8", - "title": "Execution: implement", - "agentName": "droid-impl", - "startedAt": "2026-03-16T16:21:31.373Z", - "events": [ - { - "ts": 1773678091373, - "type": "note", - "content": "\"implement\": Add a startFrom feature to the workflow runner in this repo", - "raw": { - "agent": "droid-impl" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/.trajectories/active/traj_u6bndffr0b8g.json b/.trajectories/active/traj_u6bndffr0b8g.json deleted file mode 100644 index 609f1ddd0..000000000 --- a/.trajectories/active/traj_u6bndffr0b8g.json +++ /dev/null @@ -1,137 +0,0 @@ -{ - "id": "traj_u6bndffr0b8g", - "version": 1, - "task": { - "title": "Code review for assigned workflow changes" - }, - "status": "active", - "startedAt": "2026-03-12T08:38:19.960Z", - "agents": [ - { - "name": "default", - "role": "lead", - "joinedAt": "2026-03-19T21:44:38.430Z" - } - ], - "chapters": [ - { - "id": "chap_7f65y050m95v", - "title": "Work", - "agentName": "default", - "startedAt": "2026-03-19T21:44:38.430Z", - "events": [ - { - "ts": 1773956678431, - "type": "decision", - "content": "Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace: Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace", - "raw": { - "question": "Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace", - "chosen": "Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace", - "alternatives": [], - "reasoning": "Direct DM to broker failed with agent-not-found; Lead is the only live coordinating agent and the skill protocol says to report status to your lead" - }, - "significance": "high" - }, - { - "ts": 1773957195267, - "type": "reflection", - "content": "Starting Codex subagent communication investigation and aligning it with existing Claude/Gemini plugin patterns before implementation", - "raw": { - "focalPoints": [ - "codex-subagents", - "hooks", - "skills", - "relaycast" - ], - "confidence": 0.73 - }, - "significance": "high", - "tags": [ - "focal:codex-subagents", - "focal:hooks", - "focal:skills", - "focal:relaycast", - "confidence:0.73" - ] - }, - { - "ts": 1773957457056, - "type": "decision", - "content": "Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication: Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication", - "raw": { - "question": "Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication", - "chosen": "Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication", - "alternatives": [], - "reasoning": "Official Codex docs support project skills in .agents/skills and custom agents in .codex/agents with mcp_servers and skills.config today. Hooks landed in PR #13276 but remain experimental, limited to SessionStart/Stop, and are not the best primary integration point for peer messaging." - }, - "significance": "high" - }, - { - "ts": 1774037318747, - "type": "decision", - "content": "Add canonical metadata to legacy OpenClaw redirect routes: Add canonical metadata to legacy OpenClaw redirect routes", - "raw": { - "question": "Add canonical metadata to legacy OpenClaw redirect routes", - "chosen": "Add canonical metadata to legacy OpenClaw redirect routes", - "alternatives": [], - "reasoning": "All user-facing web routes should emit a canonical URL at agentrelay.dev; the only missing route modules are the legacy OpenClaw redirect pages." - }, - "significance": "high" - }, - { - "ts": 1774037562266, - "type": "decision", - "content": "Make /openclaw/skill the primary hosted skill route: Make /openclaw/skill the primary hosted skill route", - "raw": { - "question": "Make /openclaw/skill the primary hosted skill route", - "chosen": "Make /openclaw/skill the primary hosted skill route", - "alternatives": [], - "reasoning": "The site copy, sitemap, and robots rules already use /openclaw/skill as the public URL. The current redirects invert that intent, so the content and invite pages should live under /openclaw/skill and /skill should redirect there as a legacy alias." - }, - "significance": "high" - }, - { - "ts": 1774103061345, - "type": "decision", - "content": "Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits: Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits", - "raw": { - "question": "Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits", - "chosen": "Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits", - "alternatives": [], - "reasoning": "The warnings are structural rather than behavioral, so helper extraction is the lowest-risk fix." - }, - "significance": "high" - }, - { - "ts": 1774103219705, - "type": "reflection", - "content": "Cleared the current web lint warnings by removing unused symbols, renaming ESM path helpers, and extracting animation helper functions without changing behavior.", - "raw": { - "focalPoints": [ - "lint", - "animation-components", - "esm-path-helpers" - ], - "adjustments": "Used helper extraction instead of logic changes to satisfy complexity and max-depth rules.", - "confidence": 0.87 - }, - "significance": "high", - "tags": [ - "focal:lint", - "focal:animation-components", - "focal:esm-path-helpers", - "confidence:0.87" - ] - } - ] - } - ], - "commits": [], - "filesChanged": [], - "projectId": "/Users/khaliqgant/Projects/relay", - "tags": [], - "_trace": { - "startRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af", - "endRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af" - } -} \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json new file mode 100644 index 000000000..f5bd1a782 --- /dev/null +++ b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json @@ -0,0 +1,117 @@ +{ + "id": "traj_1qnnaojcl0w6", + "version": 1, + "task": { + "title": "PTY output streaming: rate-limited buffering, stream filter, and Devin review fixes", + "source": { + "system": "plain", + "id": "390" + } + }, + "status": "completed", + "startedAt": "2026-03-25T09:28:08.884Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-03-25T09:28:14.608Z" + } + ], + "chapters": [ + { + "id": "chap_syakvczpwygq", + "title": "Work", + "agentName": "default", + "startedAt": "2026-03-25T09:28:14.608Z", + "events": [ + { + "ts": 1774430894610, + "type": "decision", + "content": "Added rate-limited buffering to worker_stream emissions in Rust PTY worker: Added rate-limited buffering to worker_stream emissions in Rust PTY worker", + "raw": { + "question": "Added rate-limited buffering to worker_stream emissions in Rust PTY worker", + "chosen": "Added rate-limited buffering to worker_stream emissions in Rust PTY worker", + "alternatives": [], + "reasoning": "Raw per-chunk worker_stream frames caused excessive frame noise. Buffer up to 4KB or 100ms before flushing to reduce overhead while maintaining low latency." + }, + "significance": "high" + }, + { + "ts": 1774430900072, + "type": "decision", + "content": "Added stream filter option to SDK onOutput: Added stream filter option to SDK onOutput", + "raw": { + "question": "Added stream filter option to SDK onOutput", + "chosen": "Added stream filter option to SDK onOutput", + "alternatives": [], + "reasoning": "Callers needed to filter by stdout/stderr without manually checking every event. Added optional { stream: 'stdout' } parameter to onOutput() so listeners only fire for matching streams." + }, + "significance": "high" + }, + { + "ts": 1774430907061, + "type": "decision", + "content": "Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication: Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication", + "raw": { + "question": "Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication", + "chosen": "Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication", + "alternatives": [], + "reasoning": "Devin review identified that the identical buffer flush pattern was copy-pasted at 5 locations, causing bugs where fixes applied to one site didn't propagate. Prior commits had patched PTY-close and watchdog paths but missed the /exit path — proving the DRY violation was the root cause." + }, + "significance": "high" + }, + { + "ts": 1774430914035, + "type": "decision", + "content": "Added explicit mode option to onOutput to bypass toString() heuristic: Added explicit mode option to onOutput to bypass toString() heuristic", + "raw": { + "question": "Added explicit mode option to onOutput to bypass toString() heuristic", + "chosen": "Added explicit mode option to onOutput to bypass toString() heuristic", + "alternatives": [], + "reasoning": "inferOutputMode used callback.toString() to detect structured vs chunk mode, which silently breaks with minifiers — defaulting to chunk mode. Combined with the new stream filter, a developer could think they're filtering structured events but receive unfiltered raw strings. Added explicit { mode: 'structured' } option as escape hatch." + }, + "significance": "high" + }, + { + "ts": 1774430922596, + "type": "decision", + "content": "Inlined post-loop flush instead of using macro to avoid CI unused-assignment error: Inlined post-loop flush instead of using macro to avoid CI unused-assignment error", + "raw": { + "question": "Inlined post-loop flush instead of using macro to avoid CI unused-assignment error", + "chosen": "Inlined post-loop flush instead of using macro to avoid CI unused-assignment error", + "alternatives": [], + "reasoning": "The flush_stream_buffer! macro always updates stream_buffer_last_flush, but after the main loop this timestamp is never read. CI treats warnings as errors (-D warnings), so the post-loop site inlines the flush without the timestamp update." + }, + "significance": "high" + }, + { + "ts": 1774430928064, + "type": "reflection", + "content": "All Devin review issues addressed. Three exit paths (PTY-close, watchdog, /exit) now consistently flush stream_buffer before agent_exit. DRY macro prevents future regressions. SDK stream filter and explicit mode option complete the feature.", + "raw": { + "confidence": 0.9 + }, + "significance": "high", + "tags": [ + "confidence:0.9" + ] + } + ], + "endedAt": "2026-03-25T09:28:53.882Z" + } + ], + "commits": [], + "filesChanged": [], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "80320ff17e665a87bb8d1b68955a3cc5821be6b1", + "endRef": "80320ff17e665a87bb8d1b68955a3cc5821be6b1" + }, + "completedAt": "2026-03-25T09:28:53.882Z", + "retrospective": { + "summary": "Implemented PTY output streaming improvements: rate-limited buffering in Rust (4KB/100ms), SDK onOutput stream filter and explicit mode option, flush_stream_buffer macro to eliminate duplication, and consistent buffer flushing across all 3 exit paths (/exit, PTY-close, watchdog). Addressed all Devin review findings.", + "approach": "Standard approach", + "confidence": 0.9 + } +} \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.md b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.md new file mode 100644 index 000000000..365cc1bc4 --- /dev/null +++ b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.md @@ -0,0 +1,53 @@ +# Trajectory: PTY output streaming: rate-limited buffering, stream filter, and Devin review fixes + +> **Status:** ✅ Completed +> **Task:** 390 +> **Confidence:** 90% +> **Started:** March 25, 2026 at 10:28 AM +> **Completed:** March 25, 2026 at 10:28 AM + +--- + +## Summary + +Implemented PTY output streaming improvements: rate-limited buffering in Rust (4KB/100ms), SDK onOutput stream filter and explicit mode option, flush_stream_buffer macro to eliminate duplication, and consistent buffer flushing across all 3 exit paths (/exit, PTY-close, watchdog). Addressed all Devin review findings. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Added rate-limited buffering to worker_stream emissions in Rust PTY worker +- **Chose:** Added rate-limited buffering to worker_stream emissions in Rust PTY worker +- **Reasoning:** Raw per-chunk worker_stream frames caused excessive frame noise. Buffer up to 4KB or 100ms before flushing to reduce overhead while maintaining low latency. + +### Added stream filter option to SDK onOutput +- **Chose:** Added stream filter option to SDK onOutput +- **Reasoning:** Callers needed to filter by stdout/stderr without manually checking every event. Added optional { stream: 'stdout' } parameter to onOutput() so listeners only fire for matching streams. + +### Extracted flush_stream_buffer\! macro to eliminate 5x code duplication +- **Chose:** Extracted flush_stream_buffer\! macro to eliminate 5x code duplication +- **Reasoning:** Devin review identified that the identical buffer flush pattern was copy-pasted at 5 locations, causing bugs where fixes applied to one site didn't propagate. Prior commits had patched PTY-close and watchdog paths but missed the /exit path — proving the DRY violation was the root cause. + +### Added explicit mode option to onOutput to bypass toString() heuristic +- **Chose:** Added explicit mode option to onOutput to bypass toString() heuristic +- **Reasoning:** inferOutputMode used callback.toString() to detect structured vs chunk mode, which silently breaks with minifiers — defaulting to chunk mode. Combined with the new stream filter, a developer could think they're filtering structured events but receive unfiltered raw strings. Added explicit { mode: 'structured' } option as escape hatch. + +### Inlined post-loop flush instead of using macro to avoid CI unused-assignment error +- **Chose:** Inlined post-loop flush instead of using macro to avoid CI unused-assignment error +- **Reasoning:** The flush_stream_buffer! macro always updates stream_buffer_last_flush, but after the main loop this timestamp is never read. CI treats warnings as errors (-D warnings), so the post-loop site inlines the flush without the timestamp update. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Added rate-limited buffering to worker_stream emissions in Rust PTY worker: Added rate-limited buffering to worker_stream emissions in Rust PTY worker +- Added stream filter option to SDK onOutput: Added stream filter option to SDK onOutput +- Extracted flush_stream_buffer\! macro to eliminate 5x code duplication: Extracted flush_stream_buffer\! macro to eliminate 5x code duplication +- Added explicit mode option to onOutput to bypass toString() heuristic: Added explicit mode option to onOutput to bypass toString() heuristic +- Inlined post-loop flush instead of using macro to avoid CI unused-assignment error: Inlined post-loop flush instead of using macro to avoid CI unused-assignment error +- All Devin review issues addressed. Three exit paths (PTY-close, watchdog, /exit) now consistently flush stream_buffer before agent_exit. DRY macro prevents future regressions. SDK stream filter and explicit mode option complete the feature. diff --git a/.trajectories/active/traj_ni1xbsaa03bv.json b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.json similarity index 84% rename from .trajectories/active/traj_ni1xbsaa03bv.json rename to .trajectories/completed/2026-03/traj_ni1xbsaa03bv.json index 2270aeaf3..bec7f0253 100644 --- a/.trajectories/active/traj_ni1xbsaa03bv.json +++ b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.json @@ -4,7 +4,7 @@ "task": { "title": "Add mute/unmute channel methods to sdk-typescript AgentClient" }, - "status": "active", + "status": "abandoned", "startedAt": "2026-03-23T20:21:54.726Z", "agents": [ { @@ -31,8 +31,15 @@ "reasoning": "Worker can read ../relaycast but apply_patch is limited to the writable relay workspace, so I produced a ready-to-apply patch artifact instead." }, "significance": "high" + }, + { + "ts": 1774430878870, + "type": "note", + "content": "Abandoned: Switching to PTY output streaming work", + "significance": "high" } - ] + ], + "endedAt": "2026-03-25T09:27:58.870Z" } ], "commits": [], @@ -42,5 +49,6 @@ "_trace": { "startRef": "eb6ed1f75a433eab11500574d1fa9897695e5d90", "endRef": "eb6ed1f75a433eab11500574d1fa9897695e5d90" - } + }, + "completedAt": "2026-03-25T09:27:58.870Z" } \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.md b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.md new file mode 100644 index 000000000..ceb65ea12 --- /dev/null +++ b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.md @@ -0,0 +1,23 @@ +# Trajectory: Add mute/unmute channel methods to sdk-typescript AgentClient + +> **Status:** ❌ Abandoned +> **Started:** March 23, 2026 at 09:21 PM +> **Completed:** March 25, 2026 at 10:27 AM + +--- + +## Key Decisions + +### Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox +- **Chose:** Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox +- **Reasoning:** Worker can read ../relaycast but apply_patch is limited to the writable relay workspace, so I produced a ready-to-apply patch artifact instead. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox: Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox +- Abandoned: Switching to PTY output streaming work diff --git a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json index f39364270..cddf956bc 100644 --- a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json +++ b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json @@ -4,7 +4,7 @@ "task": { "title": "Code review for assigned workflow changes" }, - "status": "completed", + "status": "abandoned", "startedAt": "2026-03-12T08:38:19.960Z", "agents": [ { @@ -122,9 +122,15 @@ "focal:esm-path-helpers", "confidence:0.87" ] + }, + { + "ts": 1774430888625, + "type": "note", + "content": "Abandoned: Stale trajectory", + "significance": "high" } ], - "endedAt": "2026-03-23T15:08:33.166Z" + "endedAt": "2026-03-25T09:28:08.625Z" } ], "commits": [], @@ -135,10 +141,5 @@ "startRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af", "endRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af" }, - "completedAt": "2026-03-23T15:08:33.166Z", - "retrospective": { - "summary": "Added channel management protocol variants and SDK client methods", - "approach": "Standard approach", - "confidence": 0.93 - } + "completedAt": "2026-03-25T09:28:08.625Z" } \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md index cd812d720..3fcac81b9 100644 --- a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md +++ b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md @@ -1,17 +1,8 @@ # Trajectory: Code review for assigned workflow changes -> **Status:** ✅ Completed -> **Confidence:** 93% +> **Status:** ❌ Abandoned > **Started:** March 12, 2026 at 09:38 AM -> **Completed:** March 23, 2026 at 04:08 PM - ---- - -## Summary - -Added channel management protocol variants and SDK client methods - -**Approach:** Standard approach +> **Completed:** March 25, 2026 at 10:28 AM --- @@ -51,3 +42,4 @@ Added channel management protocol variants and SDK client methods - Make /openclaw/skill the primary hosted skill route: Make /openclaw/skill the primary hosted skill route - Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits: Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits - Cleared the current web lint warnings by removing unused symbols, renaming ESM path helpers, and extracting animation helper functions without changing behavior. +- Abandoned: Stale trajectory diff --git a/.trajectories/completed/traj_1774425142647_96c5a9e2.json b/.trajectories/completed/traj_1774425142647_96c5a9e2.json new file mode 100644 index 000000000..3f8e0d254 --- /dev/null +++ b/.trajectories/completed/traj_1774425142647_96c5a9e2.json @@ -0,0 +1,292 @@ +{ + "id": "traj_1774425142647_96c5a9e2", + "version": 1, + "task": { + "title": "default", + "source": { + "system": "workflow-runner", + "id": "c900c9e57bd8b4d32110daaf" + } + }, + "status": "completed", + "startedAt": "2026-03-25T07:52:22.647Z", + "agents": [ + { + "name": "orchestrator", + "role": "workflow-runner", + "joinedAt": "2026-03-25T07:52:22.647Z" + }, + { + "name": "rust-dev", + "role": "specialist", + "joinedAt": "2026-03-25T07:52:29.100Z" + }, + { + "name": "sdk-dev", + "role": "specialist", + "joinedAt": "2026-03-25T07:52:29.100Z" + }, + { + "name": "fixer", + "role": "specialist", + "joinedAt": "2026-03-25T07:54:21.246Z" + } + ], + "chapters": [ + { + "id": "ch_c5384845", + "title": "Planning", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:22.647Z", + "events": [ + { + "ts": 1774425142648, + "type": "note", + "content": "Purpose: PTY output streaming improvements for GitHub issue #390.\nThe core worker_stream pipeline is ALREADY IMPLEMENTED end-to-end:\n Rust pty_worker → send_frame(\"worker_stream\") → Broker → SDK event handler\n → onWorkerOutput + agent.onOutput()\n\nREMAINING GAP: PTY emits worker_stream on every read chunk with no batching. This workflow adds 100ms rate-limited buffering in Rust and optional stream filtering in the SDK's onOutput API.\n2 phases: parallel implementation → verification + fix." + }, + { + "ts": 1774425142648, + "type": "note", + "content": "Approach: 9-step dag workflow — Parsed 9 steps, 3 parallel tracks, 6 dependent steps, DAG validated, no cycles" + } + ], + "endedAt": "2026-03-25T07:52:26.120Z" + }, + { + "id": "ch_73f88d81", + "title": "Execution: read-pty-worker, read-sdk-relay, read-existing-tests", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:26.120Z", + "events": [], + "endedAt": "2026-03-25T07:52:29.094Z" + }, + { + "id": "ch_f4f09c51", + "title": "Convergence: read-pty-worker + read-sdk-relay + read-existing-tests", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:29.094Z", + "events": [ + { + "ts": 1774425149098, + "type": "reflection", + "content": "read-pty-worker + read-sdk-relay + read-existing-tests resolved. 3/3 steps completed. All steps completed on first attempt. Unblocking: rust-output-buffering, sdk-stream-filtering.", + "significance": "high", + "raw": { + "confidence": 0.75, + "focalPoints": [ + "read-pty-worker: completed", + "read-sdk-relay: completed", + "read-existing-tests: completed" + ] + } + } + ], + "endedAt": "2026-03-25T07:52:29.099Z" + }, + { + "id": "ch_f051fec1", + "title": "Execution: rust-output-buffering, sdk-stream-filtering", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:29.099Z", + "events": [], + "endedAt": "2026-03-25T07:52:29.100Z" + }, + { + "id": "ch_3f8ca847", + "title": "Execution: rust-output-buffering", + "agentName": "rust-dev", + "startedAt": "2026-03-25T07:52:29.100Z", + "events": [ + { + "ts": 1774425149100, + "type": "note", + "content": "\"rust-output-buffering\": Add 100ms rate-limited buffering to worker_stream emission in src/pty_worker.rs", + "raw": { + "agent": "rust-dev" + } + } + ], + "endedAt": "2026-03-25T07:52:29.100Z" + }, + { + "id": "ch_fbc1c17c", + "title": "Execution: sdk-stream-filtering", + "agentName": "sdk-dev", + "startedAt": "2026-03-25T07:52:29.100Z", + "events": [ + { + "ts": 1774425149100, + "type": "note", + "content": "\"sdk-stream-filtering\": Add optional stream-type filtering to agent.onOutput() in packages/sdk/src/relay.ts", + "raw": { + "agent": "sdk-dev" + } + }, + { + "ts": 1774425235603, + "type": "completion-evidence", + "content": "\"sdk-stream-filtering\" verification-based completion — Verification passed (4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0; signals=0, No errors. All changes are complete., Verification passed, **[sdk-stream-filtering] Output:**; channel=**[sdk-stream-filtering] Output:**\n```\nNo errors. All changes are complete.\n**Summary of changes:**\n**`packages/sdk/src/relay.ts`:**\n1. `Agent` interface — upda; files=modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts, modified:packages/sdk/src/relay.ts, modified:src/pty_worker.rs, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/invoked.timestamp, modified:target/debug/.fingerprint/agent-relay-broker-c1a62271656ca6f7/dep-lib-relay_broker; exit=0)", + "significance": "medium", + "raw": { + "stepName": "sdk-stream-filtering", + "completionMode": "verification", + "reason": "Verification passed", + "evidence": { + "summary": "4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0", + "signals": [ + "0", + "No errors. All changes are complete.", + "Verification passed", + "**[sdk-stream-filtering] Output:**" + ], + "channelPosts": [ + "**[sdk-stream-filtering] Output:**\n```\nNo errors. All changes are complete.\n**Summary of changes:**\n**`packages/sdk/src/relay.ts`:**\n1. `Agent` interface — upda" + ], + "files": [ + "modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts", + "modified:packages/sdk/src/relay.ts", + "modified:src/pty_worker.rs", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/invoked.timestamp", + "modified:target/debug/.fingerprint/agent-relay-broker-c1a62271656ca6f7/dep-lib-relay_broker" + ], + "exitCode": 0 + } + } + }, + { + "ts": 1774425235603, + "type": "finding", + "content": "\"sdk-stream-filtering\" completed → - `{ stream: 'stderr' }` filter ignores stdout events", + "significance": "medium" + }, + { + "ts": 1774425250796, + "type": "completion-evidence", + "content": "\"rust-output-buffering\" verification-based completion — Verification passed (4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0; signals=0, Build succeeds. Here's a summary of what was done:, Verification passed, **[rust-output-buffering] Output:**; channel=**[rust-output-buffering] Output:**\n```\nBuild succeeds. Here's a summary of what was done:\n**Changes to `src/pty_worker.rs`:**\n1. **Added buffer declarations** ; files=modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts, modified:packages/sdk/src/relay.ts, modified:src/pty_worker.rs, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker.json, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/dep-bin-agent-relay-broker; exit=0)", + "significance": "medium", + "raw": { + "stepName": "rust-output-buffering", + "completionMode": "verification", + "reason": "Verification passed", + "evidence": { + "summary": "4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0", + "signals": [ + "0", + "Build succeeds. Here's a summary of what was done:", + "Verification passed", + "**[rust-output-buffering] Output:**" + ], + "channelPosts": [ + "**[rust-output-buffering] Output:**\n```\nBuild succeeds. Here's a summary of what was done:\n**Changes to `src/pty_worker.rs`:**\n1. **Added buffer declarations** " + ], + "files": [ + "modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts", + "modified:packages/sdk/src/relay.ts", + "modified:src/pty_worker.rs", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker.json", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/dep-bin-agent-relay-broker" + ], + "exitCode": 0 + } + } + }, + { + "ts": 1774425250796, + "type": "finding", + "content": "\"rust-output-buffering\" completed → Build succeeds. Here's a summary of what was done:\n\n**Changes to `src/pty_worker.rs`:**\n\n1. **Added buffer declarations*", + "significance": "medium" + } + ], + "endedAt": "2026-03-25T07:54:10.799Z" + }, + { + "id": "ch_5d1c6959", + "title": "Convergence: rust-output-buffering + sdk-stream-filtering", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:54:10.799Z", + "events": [ + { + "ts": 1774425250799, + "type": "reflection", + "content": "rust-output-buffering + sdk-stream-filtering resolved. 2/2 steps completed. All steps completed on first attempt. Unblocking: verify-files.", + "significance": "high", + "raw": { + "confidence": 1, + "focalPoints": [ + "rust-output-buffering: completed", + "sdk-stream-filtering: completed" + ] + } + } + ], + "endedAt": "2026-03-25T07:54:21.246Z" + }, + { + "id": "ch_6985dbcf", + "title": "Execution: fix-failures", + "agentName": "fixer", + "startedAt": "2026-03-25T07:54:21.246Z", + "events": [ + { + "ts": 1774425261246, + "type": "note", + "content": "\"fix-failures\": Build results:", + "raw": { + "agent": "fixer" + } + }, + { + "ts": 1774425268713, + "type": "completion-evidence", + "content": "\"fix-failures\" verification-based completion — Verification passed (2 signal(s), exit=0; signals=0, Verification passed; exit=0)", + "significance": "medium", + "raw": { + "stepName": "fix-failures", + "completionMode": "verification", + "reason": "Verification passed", + "evidence": { + "summary": "2 signal(s), exit=0", + "signals": [ + "0", + "Verification passed" + ], + "exitCode": 0 + } + } + }, + { + "ts": 1774425268713, + "type": "finding", + "content": "\"fix-failures\" completed → All clear. Build is OK — Cargo, Clippy, and TypeScript all passed with no errors.", + "significance": "medium" + } + ], + "endedAt": "2026-03-25T07:54:44.166Z" + }, + { + "id": "ch_6d0e5bbb", + "title": "Retrospective", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:54:44.166Z", + "events": [ + { + "ts": 1774425284166, + "type": "reflection", + "content": "All 9 steps completed in 2min. (completed in 2 minutes)", + "significance": "high" + } + ], + "endedAt": "2026-03-25T07:54:44.166Z" + } + ], + "completedAt": "2026-03-25T07:54:44.166Z", + "retrospective": { + "summary": "All 9 steps completed in 2min.", + "approach": "dag workflow (3 agents)", + "confidence": 0.8333333333333334, + "learnings": [], + "challenges": [] + } +} \ No newline at end of file diff --git a/.trajectories/index.json b/.trajectories/index.json index ba058b2b8..519d85fbf 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-03-21T14:26:59.706Z", + "lastUpdated": "2026-03-25T09:28:53.988Z", "trajectories": { "traj_1b1dj40sl6jl": { "title": "Revert aggressive retry logic in relay-pty-orchestrator", @@ -838,9 +838,24 @@ }, "traj_u6bndffr0b8g": { "title": "Code review for assigned workflow changes", - "status": "active", + "status": "abandoned", "startedAt": "2026-03-12T08:38:19.960Z", - "path": "/Users/will/Projects/relay/.trajectories/active/traj_u6bndffr0b8g.json" + "completedAt": "2026-03-25T09:28:08.625Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json" + }, + "traj_ni1xbsaa03bv": { + "title": "Add mute/unmute channel methods to sdk-typescript AgentClient", + "status": "abandoned", + "startedAt": "2026-03-23T20:21:54.726Z", + "completedAt": "2026-03-25T09:27:58.870Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.json" + }, + "traj_1qnnaojcl0w6": { + "title": "PTY output streaming: rate-limited buffering, stream filter, and Devin review fixes", + "status": "completed", + "startedAt": "2026-03-25T09:28:08.884Z", + "completedAt": "2026-03-25T09:28:53.882Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json" } } } \ No newline at end of file diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index 5f1bb1a7b..4d6493a18 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -1107,4 +1107,101 @@ describe('Agent.onOutput', () => { await relay.shutdown(); } }); + + it('onOutput with { stream: "stdout" } only receives stdout events', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'stream-filter-agent', + cli: 'claude', + channels: ['general'], + }); + + const chunks: string[] = []; + agent.onOutput((chunk: string) => chunks.push(chunk), { stream: 'stdout' }); + + emit({ kind: 'worker_stream', name: 'stream-filter-agent', stream: 'stdout', chunk: 'out1' }); + emit({ kind: 'worker_stream', name: 'stream-filter-agent', stream: 'stderr', chunk: 'err1' }); + emit({ kind: 'worker_stream', name: 'stream-filter-agent', stream: 'stdout', chunk: 'out2' }); + + expect(chunks).toEqual(['out1', 'out2']); + } finally { + await relay.shutdown(); + } + }); + + it('onOutput without filter receives all streams', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'all-streams-agent', + cli: 'claude', + channels: ['general'], + }); + + const chunks: string[] = []; + agent.onOutput((chunk: string) => chunks.push(chunk)); + + emit({ kind: 'worker_stream', name: 'all-streams-agent', stream: 'stdout', chunk: 'out' }); + emit({ kind: 'worker_stream', name: 'all-streams-agent', stream: 'stderr', chunk: 'err' }); + + expect(chunks).toEqual(['out', 'err']); + } finally { + await relay.shutdown(); + } + }); + + it('onOutput with { stream: "stderr" } ignores stdout events', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'stderr-filter-agent', + cli: 'claude', + channels: ['general'], + }); + + const chunks: string[] = []; + agent.onOutput((chunk: string) => chunks.push(chunk), { stream: 'stderr' }); + + emit({ kind: 'worker_stream', name: 'stderr-filter-agent', stream: 'stdout', chunk: 'ignored' }); + emit({ kind: 'worker_stream', name: 'stderr-filter-agent', stream: 'stderr', chunk: 'captured' }); + + expect(chunks).toEqual(['captured']); + } finally { + await relay.shutdown(); + } + }); + + it('onOutput with explicit mode: "structured" receives { stream, chunk } objects', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'explicit-mode-agent', + cli: 'claude', + channels: ['general'], + }); + + const payloads: Array<{ stream: string; chunk: string }> = []; + // Use a plain (chunk) => ... signature but force structured mode via options + agent.onOutput(((data: { stream: string; chunk: string }) => payloads.push(data)) as any, { mode: 'structured' }); + + emit({ kind: 'worker_stream', name: 'explicit-mode-agent', stream: 'stdout', chunk: 'hello' }); + + expect(payloads).toEqual([{ stream: 'stdout', chunk: 'hello' }]); + } finally { + await relay.shutdown(); + } + }); }); diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 7bf6f947e..bb90041db 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -181,8 +181,11 @@ export interface Agent { }): Promise; subscribe(channels: string[]): Promise; unsubscribe(channels: string[]): Promise; - /** Register a callback for PTY output from this agent. Returns an unsubscribe function. */ - onOutput(callback: AgentOutputCallback): () => void; + /** Register a callback for PTY output from this agent. Returns an unsubscribe function. + * @param options.stream — if provided, only invoke callback when the event stream matches (e.g. 'stdout', 'stderr') + * @param options.mode — 'chunk' for raw string callbacks, 'structured' for { stream, chunk } callbacks. Auto-detected if omitted. + */ + onOutput(callback: AgentOutputCallback, options?: { stream?: string; mode?: 'chunk' | 'structured' }): () => void; } export interface HumanHandle { @@ -239,6 +242,7 @@ export interface AgentRelayOptions { type OutputListener = { callback: AgentOutputCallback; mode: 'chunk' | 'structured'; + stream?: string; }; type InternalAgent = Agent & { @@ -897,6 +901,7 @@ export class AgentRelay { const listeners = this.outputListeners.get(name); if (!listeners) return; for (const listener of listeners) { + if (listener.stream !== undefined && listener.stream !== stream) continue; if (listener.mode === 'structured') { (listener.callback as (data: AgentOutputPayload) => void)({ stream, chunk }); } else { @@ -1305,7 +1310,7 @@ export class AgentRelay { async unsubscribe(channelsToRemove: string[]) { await relay.unsubscribe({ agent: name, channels: channelsToRemove }); }, - onOutput(callback: AgentOutputCallback): () => void { + onOutput(callback: AgentOutputCallback, options?: { stream?: string; mode?: 'chunk' | 'structured' }): () => void { let listeners = relay.outputListeners.get(name); if (!listeners) { listeners = new Set(); @@ -1313,7 +1318,8 @@ export class AgentRelay { } const listener: OutputListener = { callback, - mode: relay.inferOutputMode(callback), + mode: options?.mode ?? relay.inferOutputMode(callback), + stream: options?.stream, }; listeners.add(listener); return () => { diff --git a/relay.pty-output-streaming.yaml b/relay.pty-output-streaming.yaml new file mode 100644 index 000000000..91f1e6a5b --- /dev/null +++ b/relay.pty-output-streaming.yaml @@ -0,0 +1,239 @@ +version: '1.0' +name: pty-output-streaming +description: > + PTY output streaming improvements for GitHub issue #390. + + The core worker_stream pipeline is ALREADY IMPLEMENTED end-to-end: + Rust pty_worker → send_frame("worker_stream") → Broker → SDK event handler + → onWorkerOutput + agent.onOutput() + + REMAINING GAP: PTY emits worker_stream on every read chunk with no batching. + This workflow adds 100ms rate-limited buffering in Rust and optional stream + filtering in the SDK's onOutput API. + + 2 phases: parallel implementation → verification + fix. + +swarm: + pattern: dag + maxConcurrency: 3 + timeoutMs: 3600000 + channel: wf-pty-streaming + idleNudge: + nudgeAfterMs: 300000 + escalateAfterMs: 300000 + maxNudges: 2 + +agents: + - name: rust-dev + cli: claude + preset: worker + role: 'Rust backend — adds rate-limited buffering to worker_stream emission in pty_worker.rs' + constraints: + model: sonnet + + - name: sdk-dev + cli: claude + preset: worker + role: 'TypeScript SDK — adds stream filtering to agent.onOutput() and writes tests' + constraints: + model: sonnet + + - name: fixer + cli: claude + preset: worker + role: 'Fixes type errors and test failures' + constraints: + model: sonnet + +workflows: + - name: default + description: 'Add rate-limited buffering on Rust side and stream filtering on SDK side.' + onError: retry + + preflight: + - command: test -f src/pty_worker.rs + description: 'Rust PTY worker source exists' + - command: test -f packages/sdk/src/relay.ts + description: 'SDK relay source exists' + - command: grep -q worker_stream src/pty_worker.rs + description: 'worker_stream emission already exists' + - command: grep -q worker_stream packages/sdk/src/protocol.ts + description: 'worker_stream protocol type already defined' + + steps: + # ════════════════════════════════════════════════════════════════════════ + # PHASE 1: Context reads + parallel implementation + # ════════════════════════════════════════════════════════════════════════ + + - name: read-pty-worker + type: deterministic + command: | + echo "=== worker_stream emission site ===" + grep -n -B5 -A10 'worker_stream' src/pty_worker.rs + echo "" + echo "=== buffer declarations area (lines 220-260) ===" + sed -n '220,260p' src/pty_worker.rs + echo "" + echo "=== verification_tick select arm ===" + grep -n -B2 -A10 'verification_tick' src/pty_worker.rs + echo "" + echo "=== imports ===" + head -30 src/pty_worker.rs + captureOutput: true + + - name: read-sdk-relay + type: deterministic + command: | + echo "=== onOutput method ===" + grep -n -B5 -A20 'onOutput' packages/sdk/src/relay.ts + echo "" + echo "=== AgentOutputCallback type ===" + grep -n -B2 -A5 'AgentOutputCallback' packages/sdk/src/relay.ts + echo "" + echo "=== Agent interface output section ===" + grep -n -B2 -A5 'onOutput' packages/sdk/src/relay.ts | head -30 + echo "" + echo "=== dispatchOutput ===" + grep -n -B2 -A15 'dispatchOutput' packages/sdk/src/relay.ts + captureOutput: true + + - name: read-existing-tests + type: deterministic + command: | + echo "=== onOutput test patterns ===" + grep -n -B2 -A15 'onOutput\|worker_stream' packages/sdk/src/__tests__/orchestration-upgrades.test.ts | head -60 + captureOutput: true + + # ── Wave 1: Rust buffering + SDK filtering (parallel) ────────────────── + + - name: rust-output-buffering + agent: rust-dev + dependsOn: [read-pty-worker] + task: | + Add 100ms rate-limited buffering to worker_stream emission in src/pty_worker.rs. + + CONTEXT: + {{steps.read-pty-worker.output}} + + Changes: + 1. Add an output buffer (String) and last-flush Instant near existing buffer declarations + 2. Replace direct send_frame("worker_stream") with buffer accumulation + 3. Flush when: 100ms elapsed since last flush OR buffer exceeds 4096 bytes + 4. Add flush check on the verification_tick interval for quiet periods + 5. Keep the watchdog late-output emission as-is + + Constraints: anyhow::Result (no unwrap), tracing macros, no new crate deps, + std::time::Instant (already imported), match existing code style. + + After implementing, run: cargo build 2>&1 | tail -10 + + Write the file to disk. + verification: + type: exit_code + retries: 2 + + - name: sdk-stream-filtering + agent: sdk-dev + dependsOn: [read-sdk-relay, read-existing-tests] + task: | + Add optional stream-type filtering to agent.onOutput() in packages/sdk/src/relay.ts. + + CONTEXT: + {{steps.read-sdk-relay.output}} + + EXISTING TEST PATTERNS: + {{steps.read-existing-tests.output}} + + Changes: + 1. Update onOutput signature: onOutput(callback, options?: { stream?: string }): () => void + 2. When options.stream is provided, only invoke callback when event stream field matches + 3. Update Agent interface to include the options param + 4. Add tests in orchestration-upgrades.test.ts: + - onOutput with { stream: 'stdout' } only receives stdout + - onOutput without filter receives all streams + - onOutput with { stream: 'stderr' } ignores stdout + + Keep backward compatibility — no options = receive all. + + After implementing, run: npx tsc --noEmit 2>&1 | tail -10 + + Write all files to disk. + verification: + type: exit_code + retries: 2 + + # ════════════════════════════════════════════════════════════════════════ + # PHASE 2: Verification + fix + # ════════════════════════════════════════════════════════════════════════ + + - name: verify-files + type: deterministic + dependsOn: [rust-output-buffering, sdk-stream-filtering] + command: | + errors=0 + grep -q "output_buffer\|flush" src/pty_worker.rs || { echo "FAIL: pty_worker.rs missing buffering"; errors=$((errors+1)); } + grep -q "stream.*filter\|options.*stream" packages/sdk/src/relay.ts || { echo "FAIL: relay.ts missing stream filtering"; errors=$((errors+1)); } + [ $errors -gt 0 ] && exit 1 + echo "All implementation files verified" + captureOutput: true + failOnError: true + + - name: build-check + type: deterministic + dependsOn: [verify-files] + command: | + echo "=== Cargo build ===" + cargo build 2>&1 | tail -10 + cargo_exit=$? + echo "" + echo "=== Clippy ===" + cargo clippy -- -D warnings 2>&1 | tail -10 + clippy_exit=$? + echo "" + echo "=== TypeScript ===" + cd packages/sdk && npx tsc --noEmit 2>&1 | tail -10 + tsc_exit=$? + echo "" + [ $cargo_exit -eq 0 ] && [ $clippy_exit -eq 0 ] && [ $tsc_exit -eq 0 ] && echo "BUILD_OK" || echo "BUILD_FAILED" + [ $cargo_exit -eq 0 ] && [ $clippy_exit -eq 0 ] && [ $tsc_exit -eq 0 ] + captureOutput: true + failOnError: false + + - name: fix-failures + agent: fixer + dependsOn: [build-check] + task: | + Build results: + {{steps.build-check.output}} + + If BUILD_OK, there's nothing to fix. Just confirm all clear. + + Otherwise, fix type errors, lint errors, or compilation failures in: + - src/pty_worker.rs (Rust buffering) + - packages/sdk/src/relay.ts (SDK filtering) + - packages/sdk/src/__tests__/orchestration-upgrades.test.ts (tests) + + After fixing, re-run the failing commands to verify. + Write files to disk. + verification: + type: exit_code + retries: 2 + + - name: final-tests + type: deterministic + dependsOn: [fix-failures] + command: | + set -o pipefail + echo "=== Cargo test ===" + cargo test 2>&1 | tail -20 + echo "" + echo "=== SDK tests ===" + cd packages/sdk && npx vitest run src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 + captureOutput: true + failOnError: true + +errorHandling: + strategy: retry + maxRetries: 2 + retryDelayMs: 5000 + notifyChannel: wf-pty-streaming diff --git a/src/pty_worker.rs b/src/pty_worker.rs index fd9eed991..fd119ea01 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -254,6 +254,26 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Bounded to avoid unbounded memory growth; continuity blocks are small. let mut continuity_buffer = String::new(); const CONTINUITY_BUFFER_MAX: usize = 4096; + // Rate-limited buffering for worker_stream emissions. + // Chunks are accumulated and flushed at most every 100ms or when buffer exceeds threshold. + let mut stream_buffer = String::new(); + let mut stream_buffer_last_flush = Instant::now(); + const STREAM_BUFFER_MAX: usize = 4096; + const STREAM_FLUSH_INTERVAL: Duration = Duration::from_millis(100); + + /// Flush `stream_buffer` via a `worker_stream` frame if non-empty. + macro_rules! flush_stream_buffer { + () => { + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + stream_buffer_last_flush = Instant::now(); + } + }; + } let mut verification_tick = tokio::time::interval(Duration::from_millis(200)); verification_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -477,6 +497,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Detect /exit command in agent output and trigger graceful shutdown. // Skip detection while echo verifications are pending to avoid // false-positives from injected relay messages containing "/exit". + stream_buffer.push_str(&text); + if stream_buffer.len() >= STREAM_BUFFER_MAX + || stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL + { + flush_stream_buffer!(); + } + if pending_verifications.is_empty() && clean_text.lines().any(|line| line.trim() == "/exit") { @@ -484,17 +511,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { target = "agent_relay::worker::pty", "agent issued /exit — shutting down" ); + flush_stream_buffer!(); let _ = send_frame(&out_tx, "agent_exit", None, json!({ "reason": "agent_requested", })).await; running = false; } - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": text, - })).await; - pty_auto.update_auto_suggestion(&text); pty_auto.last_output_time = Instant::now(); pty_auto.reset_idle_on_output(); @@ -667,8 +690,11 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } } None => { - // PTY reader closed — child likely exited. Emit - // agent_exit with any echo_buffer tail so the + // PTY reader closed — child likely exited. Flush + // any buffered stream output before sending + // agent_exit to preserve output ordering. + flush_stream_buffer!(); + // Emit agent_exit with any echo_buffer tail so the // dashboard can surface the CLI's last output. let clean = strip_ansi(&echo_buffer); let trimmed = if clean.len() > 2000 { @@ -856,6 +882,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } } + // Flush any buffered stream output during quiet periods. + if !stream_buffer.is_empty() + && stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL + { + flush_stream_buffer!(); + } + } // --- Auto-enter for stuck agents --- @@ -890,6 +923,9 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // messages from CLIs that exit immediately (e.g. codex MCP // failure). Without this, the output is lost in the race // between the reader thread and the watchdog. + // Flush any buffered stream output before sending late output + // to preserve ordering. + flush_stream_buffer!(); let mut late_output = String::new(); while let Ok(chunk) = pty_rx.try_recv() { let text = String::from_utf8_lossy(&chunk).to_string(); @@ -952,6 +988,21 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } } + // Flush any remaining buffered stream output before signaling exit. + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame( + &out_tx, + "worker_stream", + None, + json!({ + "stream": "stdout", + "chunk": chunk, + }), + ) + .await; + } + if child_exit_detected { let _ = send_frame( &out_tx,