From 9cafbf69b2177223d768a2eeec6ff28b3ab7ecb5 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Tue, 10 Mar 2026 11:13:34 +0100 Subject: [PATCH 01/14] feat: add workflow for PTY output streaming (#390) Multi-agent workflow to implement real-time PTY output streaming across five layers: Rust emitter, TS wrapper parser, protocol types, daemon subscription, and SDK client method. Co-Authored-By: Claude Opus 4.6 --- relay.pty-output-streaming.yaml | 513 ++++++++++++++++++++++++++++++++ 1 file changed, 513 insertions(+) create mode 100644 relay.pty-output-streaming.yaml diff --git a/relay.pty-output-streaming.yaml b/relay.pty-output-streaming.yaml new file mode 100644 index 000000000..d5cd8b056 --- /dev/null +++ b/relay.pty-output-streaming.yaml @@ -0,0 +1,513 @@ +version: '1.0' +name: pty-output-streaming +description: > + Implement PTY output streaming (GitHub issue #390). Adds real-time visibility + into spawned agent output by emitting buffered output chunks as JSON events + from relay-pty, parsing them in the TS wrapper, adding protocol types for + subscription, wiring daemon forwarding, and exposing an SDK client method. + + Five layers: Rust PTY emitter -> TS wrapper parser -> protocol types -> + daemon subscription -> SDK client method. Layers 1-2 are required for + minimum viability; layers 3-5 enable subscription from external clients. + +swarm: + pattern: dag + maxConcurrency: 3 + timeoutMs: 7200000 + channel: pty-streaming + idleNudge: + nudgeAfterMs: 300000 + escalateAfterMs: 300000 + maxNudges: 2 + +agents: + - name: rust-dev + cli: claude + channels: [pty-streaming, rust-track] + role: > + Rust backend developer. Modifies relay-pty/src/main.rs to emit buffered + output chunks as JSON events on stderr. Familiar with the PTY event loop, + tokio async, and serde_json serialization. + constraints: + model: opus + + - name: ts-wrapper-dev + cli: claude + channels: [pty-streaming, ts-track] + role: > + TypeScript developer working on the PTY orchestrator wrapper and protocol + types. Parses stderr JSON events, adds protocol message types, wires + daemon subscription, and adds SDK client method. + constraints: + model: sonnet + + - name: integration-tester + cli: claude + channels: [pty-streaming] + role: > + Tests the full pipeline end-to-end. Verifies Rust emits output events, + TS wrapper parses them, daemon forwards them, and SDK can subscribe. + constraints: + model: sonnet + +workflows: + - name: implement-pty-output-streaming + description: > + Layered implementation: Rust emitter -> TS wrapper -> protocol types -> + daemon forwarding -> SDK subscription -> integration test -> PR. + onError: retry + preflight: + - command: ls relay-pty/src/main.rs + description: 'Rust PTY source exists' + - command: ls packages/wrapper/src/relay-pty-orchestrator.ts + description: 'TS PTY orchestrator exists' + - command: cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -3 + description: 'Rust builds cleanly before changes' + - command: git status --porcelain + failIf: non-empty + description: 'Working directory is clean' + + steps: + # ── Wave 1: Parallel foundation work (Rust + Protocol types) ──────── + + - name: rust-output-emitter + type: agent + agent: rust-dev + task: | + Implement buffered PTY output streaming in relay-pty/src/main.rs. + + CONTEXT (from issue #390): + relay-pty already captures all PTY output in its event loop. After + writing to stdout (line ~716) and the log file (line ~720), the data + is parsed for relay commands but the raw output text is never emitted + as a JSON event back to the daemon. + + TASK: + 1. Add an output buffer (String) and a last-flush timestamp (Instant) + to the PTY event loop state. + + 2. In the PTY read handler, after the stdout write and log file write + but BEFORE the relay command parser: + - Append the chunk (as lossy UTF-8) to the output buffer + - If 100ms have elapsed since last flush OR buffer exceeds 4096 bytes: + - If json_output is enabled and buffer is non-empty after trim: + - Emit a JSON event to stderr: + {"type":"output","ts":,"data":""} + - Use serde_json::json! macro and eprintln! + - Clear the buffer and reset the flush timestamp + + 3. Add a flush on the existing tick/timer arm of the select! loop so + buffered output is flushed even during quiet periods. + + 4. Ensure the output event uses the same stderr JSON emission path as + existing relay_command events (eprintln with serde_json::to_string). + + CONSTRAINTS: + - Use anyhow::Result, not unwrap() in production paths + - Use tracing macros for any debug logging + - Do not add new crate dependencies (serde_json is already available) + - The buffer+flush approach limits events to ~10/sec max + - Skip chunks that are pure whitespace after trim + + After implementing, run: + cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 + cargo clippy --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 + + Do NOT use relay_spawn or add_agent. + Print exactly: RUST_EMITTER_DONE + verification: + type: output_contains + value: RUST_EMITTER_DONE + retries: 2 + + - name: protocol-types + type: agent + agent: ts-wrapper-dev + task: | + Add protocol message types for PTY output streaming subscription. + + CONTEXT (from issue #390): + We need two new message types so SDK clients can subscribe to a + worker's real-time output. The daemon already has onLogOutput + infrastructure; these types formalize the subscription protocol. + + TASK: + 1. Find the protocol types file. Check these locations: + - packages/protocol/src/types.ts + - packages/sdk/src/protocol.ts + - packages/shared/src/protocol.ts + Look for where existing message types like LOG, SPAWN, etc. are defined. + + 2. Add two new message types to the enum/union: + - SUBSCRIBE_WORKER_OUTPUT: sent by client to daemon to subscribe + Payload: { agent: string } + - WORKER_OUTPUT: sent by daemon to subscribed clients + Payload: { agent: string, data: string, ts: number } + + 3. Add corresponding TypeScript interfaces for the payloads: + - SubscribeWorkerOutputPayload { agent: string } + - WorkerOutputPayload { agent: string, data: string, ts: number } + + 4. If there is a Rust protocol counterpart (check relay-pty/src/ for + protocol types), add the matching Rust types with serde derives. + + CONSTRAINTS: + - Follow existing naming conventions in the types file + - Use type-only imports where appropriate + - Export new types from the module's index + + After implementing, run type checking: + npx tsc --noEmit -p packages/sdk/tsconfig.json 2>&1 | tail -10 + (or whichever tsconfig covers the protocol types) + + Do NOT use relay_spawn or add_agent. + Print exactly: PROTOCOL_TYPES_DONE + verification: + type: output_contains + value: PROTOCOL_TYPES_DONE + retries: 1 + + # ── Wave 2: TS wrapper parsing (depends on protocol types) ────────── + + - name: ts-wrapper-parser + type: agent + agent: ts-wrapper-dev + dependsOn: [protocol-types] + task: | + Add stderr JSON event parsing to the PTY orchestrator wrapper. + + CONTEXT (from issue #390): + The Rust layer now emits {"type":"output","ts":...,"data":...} events + on stderr. The TS wrapper needs to parse these and emit them as + EventEmitter events so the daemon can forward them. + + TASK: + 1. Open packages/wrapper/src/relay-pty-orchestrator.ts + + 2. In the spawn setup (where the child process is created), add stderr + line-buffered parsing: + - Accumulate stderr chunks into a buffer string + - Split on newlines, keeping incomplete lines in the buffer + - For each complete line, try JSON.parse + - Route parsed events by their "type" field + + 3. Add a handleEvent method (or extend existing one) that switches on + event.type: + - "output": emit an 'output' event with { agent, data, ts } + - "relay_command": existing handling (check if already present) + - "continuity": existing handling (check if already present) + - Unknown types: log at debug level, do not throw + + 4. Make sure the class extends EventEmitter (or uses whatever event + pattern the codebase already uses). Check existing patterns first. + + 5. Ensure --json-output flag is included in the relay-pty spawn args + so the Rust side actually emits these events. + + CONSTRAINTS: + - Use existing EventEmitter patterns from the codebase + - Handle malformed JSON gracefully (try/catch, no crash) + - Use the WorkerOutputPayload type from the protocol types step + - Follow existing code style (ES modules, node: prefix imports) + + After implementing, run: + npx tsc --noEmit -p packages/wrapper/tsconfig.json 2>&1 | tail -10 + + Do NOT use relay_spawn or add_agent. + Print exactly: TS_WRAPPER_DONE + verification: + type: output_contains + value: TS_WRAPPER_DONE + retries: 2 + + # ── Wave 3: Daemon subscription + SDK client (parallel, depend on wrapper) ── + + - name: daemon-subscription + type: agent + agent: ts-wrapper-dev + dependsOn: [ts-wrapper-parser] + task: | + Wire daemon-side output subscription and forwarding. + + CONTEXT (from issue #390): + The wrapper now emits 'output' events. The daemon needs to: + (a) accept SUBSCRIBE_WORKER_OUTPUT from SDK clients + (b) forward WORKER_OUTPUT events to subscribers + + The daemon already has infrastructure for this: + - packages/daemon/src/spawn-manager.ts has pty.on('output') wiring + - packages/daemon/src/server.ts has onLogOutput callback + - The __broadcastLogOutput mechanism already exists + + TASK: + 1. In packages/daemon/src/server.ts: + - Add a Map> for output subscriptions + - Handle SUBSCRIBE_WORKER_OUTPUT messages: add the connection to + the subscription set for the requested agent name + - Clean up subscriptions when connections close + - When forwarding output, create a WORKER_OUTPUT envelope and send + to all subscribers for that agent + + 2. In packages/daemon/src/spawn-manager.ts: + - When the PTY orchestrator emits 'output', forward it to the + server's subscription mechanism + - Use the existing onLogOutput pattern as a reference + + CONSTRAINTS: + - Use the SUBSCRIBE_WORKER_OUTPUT and WORKER_OUTPUT types from the + protocol types step + - Follow existing message handling patterns in server.ts + - Clean up subscriptions on disconnect to prevent memory leaks + - Use tracing/logging at debug level for subscription events + + After implementing, run: + npx tsc --noEmit -p packages/daemon/tsconfig.json 2>&1 | tail -10 + + Do NOT use relay_spawn or add_agent. + Print exactly: DAEMON_SUBSCRIPTION_DONE + verification: + type: output_contains + value: DAEMON_SUBSCRIPTION_DONE + retries: 1 + + - name: sdk-client-method + type: agent + agent: ts-wrapper-dev + dependsOn: [ts-wrapper-parser] + task: | + Add subscribeWorkerOutput method to the SDK client. + + CONTEXT (from issue #390): + SDK clients need a clean API to subscribe to a worker's output stream. + The daemon now handles SUBSCRIBE_WORKER_OUTPUT and forwards + WORKER_OUTPUT events. + + TASK: + 1. Open packages/sdk/src/client.ts (or wherever the main SDK client + class is defined). + + 2. Add a subscribeWorkerOutput method: + ``` + subscribeWorkerOutput( + agentName: string, + callback: (data: string, ts: number) => void + ): () => void + ``` + + 3. Implementation: + - Send a SUBSCRIBE_WORKER_OUTPUT envelope with { agent: agentName } + - Register a message handler that filters for WORKER_OUTPUT + envelopes where payload.agent matches agentName + - Call the callback with (payload.data, payload.ts) + - Return an unsubscribe function that removes the handler + + 4. Export the method and add JSDoc documentation with @param and + @returns tags. + + CONSTRAINTS: + - Use existing envelope creation patterns (createEnvelope or equivalent) + - Use existing message listener patterns (on/off or equivalent) + - Return a cleanup function for unsubscription + - Use the protocol types from the protocol-types step + + After implementing, run: + npx tsc --noEmit -p packages/sdk/tsconfig.json 2>&1 | tail -10 + + Do NOT use relay_spawn or add_agent. + Print exactly: SDK_CLIENT_DONE + verification: + type: output_contains + value: SDK_CLIENT_DONE + retries: 1 + + # ── Wave 4: Build gate ───────────────────────────────────────────── + + - name: build-rust + type: deterministic + dependsOn: [rust-output-emitter] + command: > + cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 && + cargo clippy --manifest-path relay-pty/Cargo.toml -- -D warnings 2>&1 | tail -10 && + echo "RUST_BUILD_OK" || echo "RUST_BUILD_FAILED" + captureOutput: true + failOnError: false + + - name: build-ts + type: deterministic + dependsOn: [daemon-subscription, sdk-client-method] + command: > + npx tsc --noEmit 2>&1 | tail -20 && + npm run lint 2>&1 | tail -10 && + echo "TS_BUILD_OK" || echo "TS_BUILD_FAILED" + captureOutput: true + failOnError: false + + # ── Wave 5: Fix any build failures ───────────────────────────────── + + - name: fix-build-failures + type: agent + agent: ts-wrapper-dev + dependsOn: [build-rust, build-ts] + task: | + Rust build: {{steps.build-rust.output}} + TS build: {{steps.build-ts.output}} + + If both show *_BUILD_OK, there is nothing to fix. Output BUILD_ALL_GREEN. + + Otherwise, fix any type errors, lint errors, or compilation failures. + For Rust issues, describe what needs to change and make the fix in + relay-pty/src/main.rs directly. + + After fixing, re-run the failing build commands to verify. + + Do NOT use relay_spawn or add_agent. + Print exactly: BUILD_ALL_GREEN + verification: + type: output_contains + value: BUILD_ALL_GREEN + retries: 2 + + # ── Wave 6: Integration test ─────────────────────────────────────── + + - name: integration-test + type: agent + agent: integration-tester + dependsOn: [fix-build-failures] + task: | + Write and run an integration test for the PTY output streaming pipeline. + + WHAT TO TEST: + 1. Rust emitter: Verify that relay-pty with --json-output emits + {"type":"output",...} events on stderr when given input. + - Build relay-pty if not already built + - Spawn it with a simple command (e.g., echo "hello world") + - Capture stderr and verify JSON output events appear + - Verify the event has type, ts, and data fields + + 2. TS wrapper: Verify the orchestrator emits 'output' events. + - Write a small test in packages/wrapper/test/ or tests/ + - Mock or spawn a real relay-pty process + - Assert the 'output' event fires with correct shape + + 3. Protocol types: Verify SUBSCRIBE_WORKER_OUTPUT and WORKER_OUTPUT + exist and have correct payload types (compile-time check via tsc). + + WHERE TO PUT TESTS: + - Rust test: relay-pty/tests/ or inline #[cfg(test)] module + - TS test: tests/integration/ or packages/wrapper/test/ + - Use existing test patterns in the repo + + After writing tests, run them: + cargo test --manifest-path relay-pty/Cargo.toml 2>&1 | tail -20 + npm test 2>&1 | tail -20 + + Do NOT use relay_spawn or add_agent. + Print exactly: INTEGRATION_TESTS_DONE + verification: + type: output_contains + value: INTEGRATION_TESTS_DONE + retries: 1 + + # ── Wave 7: Final regression + commit ────────────────────────────── + + - name: final-regression + type: deterministic + dependsOn: [integration-test] + command: > + cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -5 && + cargo clippy --manifest-path relay-pty/Cargo.toml -- -D warnings 2>&1 | tail -5 && + cargo test --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 && + npx tsc --noEmit 2>&1 | tail -5 && + echo "REGRESSION_PASSED" || echo "REGRESSION_FAILED" + captureOutput: true + failOnError: false + + - name: commit-and-pr + type: deterministic + dependsOn: [final-regression] + command: > + git add relay-pty/src/ packages/wrapper/src/ packages/daemon/src/ packages/sdk/src/ tests/ && + git commit -m "feat: PTY output streaming with 100ms buffered emission (#390)" && + git push -u origin HEAD 2>&1 && + gh pr create + --title "feat: PTY output streaming (#390)" + --body "$(cat <&1 || echo "PR_DONE" + captureOutput: true + failOnError: false + + # ── Wave 8: Summary ──────────────────────────────────────────────── + + - name: summary + type: agent + agent: integration-tester + dependsOn: [commit-and-pr] + task: | + The PTY output streaming workflow has completed. Write a summary. + + Rust emitter: {{steps.rust-output-emitter.output}} + Protocol types: {{steps.protocol-types.output}} + TS wrapper: {{steps.ts-wrapper-parser.output}} + Daemon: {{steps.daemon-subscription.output}} + SDK: {{steps.sdk-client-method.output}} + Build: {{steps.build-rust.output}} | {{steps.build-ts.output}} + Tests: {{steps.integration-test.output}} + Regression: {{steps.final-regression.output}} + PR: {{steps.commit-and-pr.output}} + + Summarize: + 1. What was implemented at each layer + 2. What tests were added + 3. Any issues encountered and how they were resolved + 4. PR link if created + + Do NOT use relay_spawn or add_agent. + Print exactly: WORKFLOW_SUMMARY_DONE + verification: + type: output_contains + value: WORKFLOW_SUMMARY_DONE + +coordination: + barriers: + - name: foundation-complete + waitFor: [rust-output-emitter, protocol-types] + timeoutMs: 1800000 + - name: implementation-complete + waitFor: [daemon-subscription, sdk-client-method] + timeoutMs: 1800000 + - name: builds-green + waitFor: [build-rust, build-ts] + timeoutMs: 600000 + +state: + backend: memory + ttlMs: 86400000 + namespace: pty-streaming + +errorHandling: + strategy: retry + maxRetries: 2 + retryDelayMs: 5000 + notifyChannel: pty-streaming + +trajectories: + enabled: true + reflectOnBarriers: true + autoDecisions: true From 8010d15eac98c6a946bb93fd5837466a8e58d55b Mon Sep 17 00:00:00 2001 From: Khaliq Date: Tue, 10 Mar 2026 11:57:41 +0100 Subject: [PATCH 02/14] fix: rewrite PTY output streaming workflow based on actual codebase (#390) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous workflow was based on wrong assumptions — it referenced non-existent paths (relay-pty/src/main.rs, packages/wrapper/, packages/daemon/) and tried to implement features that already exist. The full worker_stream pipeline is already implemented: - Rust: src/pty_worker.rs emits worker_stream frames - Protocol: packages/sdk/src/protocol.ts defines worker_stream types - SDK: packages/sdk/src/relay.ts has agent.onOutput() with filtering Rewritten workflow focuses on the actual remaining gaps: 1. Rate-limited 100ms buffering on the Rust side (reduces frame flood) 2. Optional stream-type filtering on the SDK onOutput() API Co-Authored-By: Claude Opus 4.6 --- relay.pty-output-streaming.yaml | 533 ++++++++++---------------------- 1 file changed, 161 insertions(+), 372 deletions(-) diff --git a/relay.pty-output-streaming.yaml b/relay.pty-output-streaming.yaml index d5cd8b056..c3fc7b49f 100644 --- a/relay.pty-output-streaming.yaml +++ b/relay.pty-output-streaming.yaml @@ -1,19 +1,32 @@ version: '1.0' name: pty-output-streaming description: > - Implement PTY output streaming (GitHub issue #390). Adds real-time visibility - into spawned agent output by emitting buffered output chunks as JSON events - from relay-pty, parsing them in the TS wrapper, adding protocol types for - subscription, wiring daemon forwarding, and exposing an SDK client method. - - Five layers: Rust PTY emitter -> TS wrapper parser -> protocol types -> - daemon subscription -> SDK client method. Layers 1-2 are required for - minimum viability; layers 3-5 enable subscription from external clients. + PTY output streaming improvements for GitHub issue #390. + + STATUS: The core feature is ALREADY IMPLEMENTED. The full pipeline exists: + - Rust: src/pty_worker.rs emits `worker_stream` frames (lines 441-444, 836-839) + - Protocol: packages/sdk/src/protocol.ts defines `worker_stream` BrokerEvent + and `WorkerToBroker` types + - SDK: packages/sdk/src/relay.ts handles `worker_stream` events, provides + agent.onOutput() callback with per-agent filtering and unsubscribe + - Tests: packages/sdk/src/__tests__/orchestration-upgrades.test.ts covers + onOutput, per-agent filtering, unsubscribe, and idle-resume on output + + The existing push model works end-to-end: + Rust pty_worker → send_frame("worker_stream") → Broker → SDK event handler + → onWorkerOutput global hook + dispatchOutput() → agent.onOutput(callback) + + REMAINING GAP: The Rust side emits worker_stream on EVERY PTY read chunk with + no rate-limiting or buffering. High-frequency output (e.g. streaming compilation + logs) floods the broker with small frames. Adding 100ms batched buffering in + src/pty_worker.rs would reduce frame rate to ~10/sec max while preserving + real-time feel. SDK-level output filtering by stream type (stdout vs stderr) + could also be added as a convenience. swarm: pattern: dag - maxConcurrency: 3 - timeoutMs: 7200000 + maxConcurrency: 2 + timeoutMs: 3600000 channel: pty-streaming idleNudge: nudgeAfterMs: 300000 @@ -23,340 +36,184 @@ swarm: agents: - name: rust-dev cli: claude - channels: [pty-streaming, rust-track] + channels: [pty-streaming] role: > - Rust backend developer. Modifies relay-pty/src/main.rs to emit buffered - output chunks as JSON events on stderr. Familiar with the PTY event loop, - tokio async, and serde_json serialization. + Rust backend developer. Modifies src/pty_worker.rs to add rate-limited + buffering for worker_stream frame emission. Familiar with the PTY event + loop, tokio async, and the existing send_frame helper. constraints: model: opus - - name: ts-wrapper-dev - cli: claude - channels: [pty-streaming, ts-track] - role: > - TypeScript developer working on the PTY orchestrator wrapper and protocol - types. Parses stderr JSON events, adds protocol message types, wires - daemon subscription, and adds SDK client method. - constraints: - model: sonnet - - - name: integration-tester + - name: sdk-dev cli: claude channels: [pty-streaming] role: > - Tests the full pipeline end-to-end. Verifies Rust emits output events, - TS wrapper parses them, daemon forwards them, and SDK can subscribe. + TypeScript SDK developer. Adds optional stream filtering to the + agent.onOutput() API in packages/sdk/src/relay.ts. Writes tests in + packages/sdk/src/__tests__/orchestration-upgrades.test.ts. constraints: model: sonnet workflows: - - name: implement-pty-output-streaming + - name: improve-pty-output-streaming description: > - Layered implementation: Rust emitter -> TS wrapper -> protocol types -> - daemon forwarding -> SDK subscription -> integration test -> PR. + Add rate-limited buffering on the Rust side and optional stream filtering + on the SDK side. The core worker_stream pipeline already works end-to-end. onError: retry preflight: - - command: ls relay-pty/src/main.rs - description: 'Rust PTY source exists' - - command: ls packages/wrapper/src/relay-pty-orchestrator.ts - description: 'TS PTY orchestrator exists' - - command: cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -3 - description: 'Rust builds cleanly before changes' + - command: test -f src/pty_worker.rs + description: 'Rust PTY worker source exists' + - command: test -f packages/sdk/src/relay.ts + description: 'SDK relay source exists' + - command: grep -q worker_stream src/pty_worker.rs + description: 'worker_stream emission already exists in pty_worker' + - command: grep -q worker_stream packages/sdk/src/protocol.ts + description: 'worker_stream protocol type already defined' - command: git status --porcelain failIf: non-empty description: 'Working directory is clean' steps: - # ── Wave 1: Parallel foundation work (Rust + Protocol types) ──────── + # ── Wave 1: Parallel — Rust buffering + SDK filtering ───────────── - - name: rust-output-emitter + - name: rust-output-buffering type: agent agent: rust-dev task: | - Implement buffered PTY output streaming in relay-pty/src/main.rs. - - CONTEXT (from issue #390): - relay-pty already captures all PTY output in its event loop. After - writing to stdout (line ~716) and the log file (line ~720), the data - is parsed for relay commands but the raw output text is never emitted - as a JSON event back to the daemon. + Add 100ms rate-limited buffering to worker_stream emission in + src/pty_worker.rs. - TASK: - 1. Add an output buffer (String) and a last-flush timestamp (Instant) - to the PTY event loop state. + CONTEXT: + The PTY event loop in run_pty_worker() already emits worker_stream + frames at line 441-444: + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": text, + })).await; - 2. In the PTY read handler, after the stdout write and log file write - but BEFORE the relay command parser: - - Append the chunk (as lossy UTF-8) to the output buffer - - If 100ms have elapsed since last flush OR buffer exceeds 4096 bytes: - - If json_output is enabled and buffer is non-empty after trim: - - Emit a JSON event to stderr: - {"type":"output","ts":,"data":""} - - Use serde_json::json! macro and eprintln! - - Clear the buffer and reset the flush timestamp + This fires on EVERY PTY read chunk with no batching. High-frequency + output floods the broker with many small frames. - 3. Add a flush on the existing tick/timer arm of the select! loop so - buffered output is flushed even during quiet periods. - - 4. Ensure the output event uses the same stderr JSON emission path as - existing relay_command events (eprintln with serde_json::to_string). + TASK: + 1. Add an output buffer (String) and a last-flush Instant near the + existing `echo_buffer` and `continuity_buffer` declarations + (around line 229). + + 2. Replace the direct send_frame("worker_stream") call at line 441-444 + with buffer accumulation: + - Append the chunk text to the output buffer + - Only flush (send the frame) when: + a) 100ms have elapsed since last flush, OR + b) buffer exceeds 4096 bytes + - On flush: send the accumulated buffer as one worker_stream frame, + clear the buffer, reset the flush timestamp + - Skip sending if buffer is empty after trim + + 3. Add a flush check on the existing verification_tick interval + (line 234, fires every 200ms) so buffered output is flushed even + during quiet periods. Look for the `_ = verification_tick.tick()` + arm in the select! loop and add a buffer flush there. + + 4. Keep the watchdog late-output emission at line 836-839 as-is + (it already handles the drain-on-exit case). CONSTRAINTS: - Use anyhow::Result, not unwrap() in production paths - - Use tracing macros for any debug logging - - Do not add new crate dependencies (serde_json is already available) - - The buffer+flush approach limits events to ~10/sec max - - Skip chunks that are pure whitespace after trim + - Use tracing macros for debug logging + - Do not add new crate dependencies + - Use std::time::Instant for timestamps (already imported) + - Follow existing code style in the file After implementing, run: - cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 - cargo clippy --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 + cargo build 2>&1 | tail -10 + cargo clippy -- -D warnings 2>&1 | tail -10 Do NOT use relay_spawn or add_agent. - Print exactly: RUST_EMITTER_DONE + Print exactly: RUST_BUFFERING_DONE verification: type: output_contains - value: RUST_EMITTER_DONE + value: RUST_BUFFERING_DONE retries: 2 - - name: protocol-types + - name: sdk-stream-filtering type: agent - agent: ts-wrapper-dev + agent: sdk-dev task: | - Add protocol message types for PTY output streaming subscription. - - CONTEXT (from issue #390): - We need two new message types so SDK clients can subscribe to a - worker's real-time output. The daemon already has onLogOutput - infrastructure; these types formalize the subscription protocol. - - TASK: - 1. Find the protocol types file. Check these locations: - - packages/protocol/src/types.ts - - packages/sdk/src/protocol.ts - - packages/shared/src/protocol.ts - Look for where existing message types like LOG, SPAWN, etc. are defined. - - 2. Add two new message types to the enum/union: - - SUBSCRIBE_WORKER_OUTPUT: sent by client to daemon to subscribe - Payload: { agent: string } - - WORKER_OUTPUT: sent by daemon to subscribed clients - Payload: { agent: string, data: string, ts: number } - - 3. Add corresponding TypeScript interfaces for the payloads: - - SubscribeWorkerOutputPayload { agent: string } - - WorkerOutputPayload { agent: string, data: string, ts: number } - - 4. If there is a Rust protocol counterpart (check relay-pty/src/ for - protocol types), add the matching Rust types with serde derives. - - CONSTRAINTS: - - Follow existing naming conventions in the types file - - Use type-only imports where appropriate - - Export new types from the module's index - - After implementing, run type checking: - npx tsc --noEmit -p packages/sdk/tsconfig.json 2>&1 | tail -10 - (or whichever tsconfig covers the protocol types) + Add optional stream-type filtering to agent.onOutput() in the SDK. - Do NOT use relay_spawn or add_agent. - Print exactly: PROTOCOL_TYPES_DONE - verification: - type: output_contains - value: PROTOCOL_TYPES_DONE - retries: 1 + CONTEXT: + packages/sdk/src/relay.ts already has a working onOutput() API + (line 1190-1204) that receives all worker_stream events for an agent. + The worker_stream event includes a `stream` field ("stdout" or + potentially "stderr") but there's no way to filter by stream type. - # ── Wave 2: TS wrapper parsing (depends on protocol types) ────────── - - - name: ts-wrapper-parser - type: agent - agent: ts-wrapper-dev - dependsOn: [protocol-types] - task: | - Add stderr JSON event parsing to the PTY orchestrator wrapper. - - CONTEXT (from issue #390): - The Rust layer now emits {"type":"output","ts":...,"data":...} events - on stderr. The TS wrapper needs to parse these and emit them as - EventEmitter events so the daemon can forward them. + The AgentOutputCallback type (line 148) supports two modes: + type AgentOutputCallback = + | ((chunk: string) => void) + | ((data: AgentOutputPayload) => void); TASK: - 1. Open packages/wrapper/src/relay-pty-orchestrator.ts - - 2. In the spawn setup (where the child process is created), add stderr - line-buffered parsing: - - Accumulate stderr chunks into a buffer string - - Split on newlines, keeping incomplete lines in the buffer - - For each complete line, try JSON.parse - - Route parsed events by their "type" field + 1. In packages/sdk/src/relay.ts, update the onOutput method signature + to accept an optional filter parameter: + onOutput(callback: AgentOutputCallback, options?: { stream?: string }): () => void - 3. Add a handleEvent method (or extend existing one) that switches on - event.type: - - "output": emit an 'output' event with { agent, data, ts } - - "relay_command": existing handling (check if already present) - - "continuity": existing handling (check if already present) - - Unknown types: log at debug level, do not throw + 2. In the implementation (around line 1190-1204), when options.stream + is provided, only invoke the callback when the event's stream field + matches the filter. - 4. Make sure the class extends EventEmitter (or uses whatever event - pattern the codebase already uses). Check existing patterns first. + 3. Add tests in packages/sdk/src/__tests__/orchestration-upgrades.test.ts: + - Test that onOutput with { stream: 'stdout' } only receives stdout + - Test that onOutput without filter receives all streams + - Test that onOutput with { stream: 'stderr' } ignores stdout - 5. Ensure --json-output flag is included in the relay-pty spawn args - so the Rust side actually emits these events. + 4. Update the Agent interface (line 180) to include the options param. CONSTRAINTS: - - Use existing EventEmitter patterns from the codebase - - Handle malformed JSON gracefully (try/catch, no crash) - - Use the WorkerOutputPayload type from the protocol types step - - Follow existing code style (ES modules, node: prefix imports) + - Follow existing code patterns in relay.ts + - Keep backward compatibility — no options = receive all (existing behavior) + - Use type-only imports where appropriate + - Run existing tests to ensure no regressions After implementing, run: - npx tsc --noEmit -p packages/wrapper/tsconfig.json 2>&1 | tail -10 + npx tsc --noEmit 2>&1 | tail -10 + npx vitest run packages/sdk/src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 Do NOT use relay_spawn or add_agent. - Print exactly: TS_WRAPPER_DONE + Print exactly: SDK_FILTERING_DONE verification: type: output_contains - value: TS_WRAPPER_DONE + value: SDK_FILTERING_DONE retries: 2 - # ── Wave 3: Daemon subscription + SDK client (parallel, depend on wrapper) ── + # ── Wave 2: Build gate ─────────────────────────────────────────── - - name: daemon-subscription - type: agent - agent: ts-wrapper-dev - dependsOn: [ts-wrapper-parser] - task: | - Wire daemon-side output subscription and forwarding. - - CONTEXT (from issue #390): - The wrapper now emits 'output' events. The daemon needs to: - (a) accept SUBSCRIBE_WORKER_OUTPUT from SDK clients - (b) forward WORKER_OUTPUT events to subscribers - - The daemon already has infrastructure for this: - - packages/daemon/src/spawn-manager.ts has pty.on('output') wiring - - packages/daemon/src/server.ts has onLogOutput callback - - The __broadcastLogOutput mechanism already exists - - TASK: - 1. In packages/daemon/src/server.ts: - - Add a Map> for output subscriptions - - Handle SUBSCRIBE_WORKER_OUTPUT messages: add the connection to - the subscription set for the requested agent name - - Clean up subscriptions when connections close - - When forwarding output, create a WORKER_OUTPUT envelope and send - to all subscribers for that agent - - 2. In packages/daemon/src/spawn-manager.ts: - - When the PTY orchestrator emits 'output', forward it to the - server's subscription mechanism - - Use the existing onLogOutput pattern as a reference - - CONSTRAINTS: - - Use the SUBSCRIBE_WORKER_OUTPUT and WORKER_OUTPUT types from the - protocol types step - - Follow existing message handling patterns in server.ts - - Clean up subscriptions on disconnect to prevent memory leaks - - Use tracing/logging at debug level for subscription events - - After implementing, run: - npx tsc --noEmit -p packages/daemon/tsconfig.json 2>&1 | tail -10 - - Do NOT use relay_spawn or add_agent. - Print exactly: DAEMON_SUBSCRIPTION_DONE - verification: - type: output_contains - value: DAEMON_SUBSCRIPTION_DONE - retries: 1 - - - name: sdk-client-method - type: agent - agent: ts-wrapper-dev - dependsOn: [ts-wrapper-parser] - task: | - Add subscribeWorkerOutput method to the SDK client. - - CONTEXT (from issue #390): - SDK clients need a clean API to subscribe to a worker's output stream. - The daemon now handles SUBSCRIBE_WORKER_OUTPUT and forwards - WORKER_OUTPUT events. - - TASK: - 1. Open packages/sdk/src/client.ts (or wherever the main SDK client - class is defined). - - 2. Add a subscribeWorkerOutput method: - ``` - subscribeWorkerOutput( - agentName: string, - callback: (data: string, ts: number) => void - ): () => void - ``` - - 3. Implementation: - - Send a SUBSCRIBE_WORKER_OUTPUT envelope with { agent: agentName } - - Register a message handler that filters for WORKER_OUTPUT - envelopes where payload.agent matches agentName - - Call the callback with (payload.data, payload.ts) - - Return an unsubscribe function that removes the handler - - 4. Export the method and add JSDoc documentation with @param and - @returns tags. - - CONSTRAINTS: - - Use existing envelope creation patterns (createEnvelope or equivalent) - - Use existing message listener patterns (on/off or equivalent) - - Return a cleanup function for unsubscription - - Use the protocol types from the protocol-types step - - After implementing, run: - npx tsc --noEmit -p packages/sdk/tsconfig.json 2>&1 | tail -10 - - Do NOT use relay_spawn or add_agent. - Print exactly: SDK_CLIENT_DONE - verification: - type: output_contains - value: SDK_CLIENT_DONE - retries: 1 - - # ── Wave 4: Build gate ───────────────────────────────────────────── - - - name: build-rust + - name: build-check type: deterministic - dependsOn: [rust-output-emitter] + dependsOn: [rust-output-buffering, sdk-stream-filtering] command: > - cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 && - cargo clippy --manifest-path relay-pty/Cargo.toml -- -D warnings 2>&1 | tail -10 && - echo "RUST_BUILD_OK" || echo "RUST_BUILD_FAILED" + cargo build 2>&1 | tail -10 && + cargo clippy -- -D warnings 2>&1 | tail -10 && + npx tsc --noEmit 2>&1 | tail -10 && + echo "BUILD_OK" || echo "BUILD_FAILED" captureOutput: true failOnError: false - - name: build-ts - type: deterministic - dependsOn: [daemon-subscription, sdk-client-method] - command: > - npx tsc --noEmit 2>&1 | tail -20 && - npm run lint 2>&1 | tail -10 && - echo "TS_BUILD_OK" || echo "TS_BUILD_FAILED" - captureOutput: true - failOnError: false - - # ── Wave 5: Fix any build failures ───────────────────────────────── + # ── Wave 3: Fix any failures ───────────────────────────────────── - name: fix-build-failures type: agent - agent: ts-wrapper-dev - dependsOn: [build-rust, build-ts] + agent: sdk-dev + dependsOn: [build-check] task: | - Rust build: {{steps.build-rust.output}} - TS build: {{steps.build-ts.output}} + Build result: {{steps.build-check.output}} - If both show *_BUILD_OK, there is nothing to fix. Output BUILD_ALL_GREEN. + If it shows BUILD_OK, there is nothing to fix. Output BUILD_ALL_GREEN. Otherwise, fix any type errors, lint errors, or compilation failures. - For Rust issues, describe what needs to change and make the fix in - relay-pty/src/main.rs directly. + The relevant files are: + - src/pty_worker.rs (Rust buffering changes) + - packages/sdk/src/relay.ts (SDK filtering changes) + - packages/sdk/src/__tests__/orchestration-upgrades.test.ts (new tests) After fixing, re-run the failing build commands to verify. @@ -367,116 +224,51 @@ workflows: value: BUILD_ALL_GREEN retries: 2 - # ── Wave 6: Integration test ─────────────────────────────────────── - - - name: integration-test - type: agent - agent: integration-tester - dependsOn: [fix-build-failures] - task: | - Write and run an integration test for the PTY output streaming pipeline. - - WHAT TO TEST: - 1. Rust emitter: Verify that relay-pty with --json-output emits - {"type":"output",...} events on stderr when given input. - - Build relay-pty if not already built - - Spawn it with a simple command (e.g., echo "hello world") - - Capture stderr and verify JSON output events appear - - Verify the event has type, ts, and data fields - - 2. TS wrapper: Verify the orchestrator emits 'output' events. - - Write a small test in packages/wrapper/test/ or tests/ - - Mock or spawn a real relay-pty process - - Assert the 'output' event fires with correct shape - - 3. Protocol types: Verify SUBSCRIBE_WORKER_OUTPUT and WORKER_OUTPUT - exist and have correct payload types (compile-time check via tsc). - - WHERE TO PUT TESTS: - - Rust test: relay-pty/tests/ or inline #[cfg(test)] module - - TS test: tests/integration/ or packages/wrapper/test/ - - Use existing test patterns in the repo + # ── Wave 4: Run tests ──────────────────────────────────────────── - After writing tests, run them: - cargo test --manifest-path relay-pty/Cargo.toml 2>&1 | tail -20 - npm test 2>&1 | tail -20 - - Do NOT use relay_spawn or add_agent. - Print exactly: INTEGRATION_TESTS_DONE - verification: - type: output_contains - value: INTEGRATION_TESTS_DONE - retries: 1 - - # ── Wave 7: Final regression + commit ────────────────────────────── - - - name: final-regression + - name: run-tests type: deterministic - dependsOn: [integration-test] + dependsOn: [fix-build-failures] command: > - cargo build --manifest-path relay-pty/Cargo.toml 2>&1 | tail -5 && - cargo clippy --manifest-path relay-pty/Cargo.toml -- -D warnings 2>&1 | tail -5 && - cargo test --manifest-path relay-pty/Cargo.toml 2>&1 | tail -10 && - npx tsc --noEmit 2>&1 | tail -5 && - echo "REGRESSION_PASSED" || echo "REGRESSION_FAILED" + cargo test 2>&1 | tail -20 && + npx vitest run packages/sdk/src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 && + echo "TESTS_PASSED" || echo "TESTS_FAILED" captureOutput: true failOnError: false - - name: commit-and-pr + # ── Wave 5: Commit ─────────────────────────────────────────────── + + - name: commit-changes type: deterministic - dependsOn: [final-regression] + dependsOn: [run-tests] command: > - git add relay-pty/src/ packages/wrapper/src/ packages/daemon/src/ packages/sdk/src/ tests/ && - git commit -m "feat: PTY output streaming with 100ms buffered emission (#390)" && + git add src/pty_worker.rs packages/sdk/src/relay.ts packages/sdk/src/__tests__/orchestration-upgrades.test.ts && + git commit -m "feat: add 100ms output buffering and SDK stream filtering (#390)" && git push -u origin HEAD 2>&1 && - gh pr create - --title "feat: PTY output streaming (#390)" - --body "$(cat <&1 || echo "PR_DONE" + echo "COMMIT_DONE" || echo "COMMIT_FAILED" captureOutput: true failOnError: false - # ── Wave 8: Summary ──────────────────────────────────────────────── + # ── Wave 6: Summary ────────────────────────────────────────────── - name: summary type: agent - agent: integration-tester - dependsOn: [commit-and-pr] + agent: sdk-dev + dependsOn: [commit-changes] task: | - The PTY output streaming workflow has completed. Write a summary. - - Rust emitter: {{steps.rust-output-emitter.output}} - Protocol types: {{steps.protocol-types.output}} - TS wrapper: {{steps.ts-wrapper-parser.output}} - Daemon: {{steps.daemon-subscription.output}} - SDK: {{steps.sdk-client-method.output}} - Build: {{steps.build-rust.output}} | {{steps.build-ts.output}} - Tests: {{steps.integration-test.output}} - Regression: {{steps.final-regression.output}} - PR: {{steps.commit-and-pr.output}} + The PTY output streaming improvement workflow has completed. + + Rust buffering: {{steps.rust-output-buffering.output}} + SDK filtering: {{steps.sdk-stream-filtering.output}} + Build: {{steps.build-check.output}} + Tests: {{steps.run-tests.output}} + Commit: {{steps.commit-changes.output}} Summarize: - 1. What was implemented at each layer - 2. What tests were added - 3. Any issues encountered and how they were resolved - 4. PR link if created + 1. What was changed and why + 2. What already existed (the full worker_stream pipeline) + 3. What was added (buffering + filtering) + 4. Test results Do NOT use relay_spawn or add_agent. Print exactly: WORKFLOW_SUMMARY_DONE @@ -486,14 +278,11 @@ workflows: coordination: barriers: - - name: foundation-complete - waitFor: [rust-output-emitter, protocol-types] - timeoutMs: 1800000 - name: implementation-complete - waitFor: [daemon-subscription, sdk-client-method] + waitFor: [rust-output-buffering, sdk-stream-filtering] timeoutMs: 1800000 - name: builds-green - waitFor: [build-rust, build-ts] + waitFor: [build-check] timeoutMs: 600000 state: From e0d49d1cead51784b005906e3555743b7197790e Mon Sep 17 00:00:00 2001 From: Khaliq Date: Tue, 10 Mar 2026 13:41:50 +0100 Subject: [PATCH 03/14] fix(workflow): address branch safety, pipe exit codes, and test gating in PTY streaming workflow - Add preflight check to reject execution on main branch - Add set -o pipefail to build-check and run-tests steps so pipe through tail preserves exit codes - Change run-tests failOnError to true so commit-changes is skipped on test failure - Add create-branch step and main-branch guard before commit-changes Co-Authored-By: Claude Opus 4.6 --- relay.pty-output-streaming.yaml | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/relay.pty-output-streaming.yaml b/relay.pty-output-streaming.yaml index c3fc7b49f..4203001ff 100644 --- a/relay.pty-output-streaming.yaml +++ b/relay.pty-output-streaming.yaml @@ -61,6 +61,8 @@ workflows: on the SDK side. The core worker_stream pipeline already works end-to-end. onError: retry preflight: + - command: '[ "$(git rev-parse --abbrev-ref HEAD)" != "main" ]' + description: 'Not on main branch — prevents accidental pushes to main' - command: test -f src/pty_worker.rs description: 'Rust PTY worker source exists' - command: test -f packages/sdk/src/relay.ts @@ -191,6 +193,7 @@ workflows: type: deterministic dependsOn: [rust-output-buffering, sdk-stream-filtering] command: > + set -o pipefail; cargo build 2>&1 | tail -10 && cargo clippy -- -D warnings 2>&1 | tail -10 && npx tsc --noEmit 2>&1 | tail -10 && @@ -230,18 +233,31 @@ workflows: type: deterministic dependsOn: [fix-build-failures] command: > + set -o pipefail; cargo test 2>&1 | tail -20 && npx vitest run packages/sdk/src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 && echo "TESTS_PASSED" || echo "TESTS_FAILED" captureOutput: true - failOnError: false + failOnError: true # ── Wave 5: Commit ─────────────────────────────────────────────── - - name: commit-changes + - name: create-branch type: deterministic dependsOn: [run-tests] command: > + git checkout -b feature/390-pty-output-streaming 2>/dev/null || true && + echo "BRANCH_READY" + captureOutput: true + failOnError: true + + - name: commit-changes + type: deterministic + dependsOn: [create-branch] + command: > + set -e; + BRANCH="$(git rev-parse --abbrev-ref HEAD)"; + if [ "$BRANCH" = "main" ]; then echo "ERROR: refusing to commit on main" && exit 1; fi; git add src/pty_worker.rs packages/sdk/src/relay.ts packages/sdk/src/__tests__/orchestration-upgrades.test.ts && git commit -m "feat: add 100ms output buffering and SDK stream filtering (#390)" && git push -u origin HEAD 2>&1 && From 59be1cc01c7ed849963e08187c0a2b59d65aa7ce Mon Sep 17 00:00:00 2001 From: Khaliq Date: Tue, 10 Mar 2026 13:54:36 +0100 Subject: [PATCH 04/14] fix(workflow): safely reuse existing branch --- relay.pty-output-streaming.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/relay.pty-output-streaming.yaml b/relay.pty-output-streaming.yaml index 4203001ff..8ffb33157 100644 --- a/relay.pty-output-streaming.yaml +++ b/relay.pty-output-streaming.yaml @@ -246,7 +246,9 @@ workflows: type: deterministic dependsOn: [run-tests] command: > - git checkout -b feature/390-pty-output-streaming 2>/dev/null || true && + git rev-parse --verify feature/390-pty-output-streaming >/dev/null 2>&1 && + git checkout feature/390-pty-output-streaming || + git checkout -b feature/390-pty-output-streaming && echo "BRANCH_READY" captureOutput: true failOnError: true From 7960c025f4819e80a87681226b164d6544a6b59d Mon Sep 17 00:00:00 2001 From: Khaliq Date: Mon, 16 Mar 2026 06:50:04 +0100 Subject: [PATCH 05/14] refactor: fix workflow against best practices MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Agents now use preset: worker (non-interactive, no relay tools) - Removed output_contains verification with tokens in task text (double-occurrence problem) — using exit_code instead - Pre-read context via deterministic steps, inject via step output instead of hardcoded line numbers - Removed channel subscriptions on non-interactive agents - Reduced from 6 phases to 2 (impl + verify/fix) - Removed coordination/barriers/state config (unnecessary for dag) - Shortened task prompts (was 40+ lines, now ~15) - Removed summary step (adds latency, no value) - Removed git branch/commit steps (workflow shouldn't auto-commit) - Added file verification gate between impl and build --- relay.pty-output-streaming.yaml | 405 +++++++++++++------------------- 1 file changed, 162 insertions(+), 243 deletions(-) diff --git a/relay.pty-output-streaming.yaml b/relay.pty-output-streaming.yaml index 8ffb33157..91f1e6a5b 100644 --- a/relay.pty-output-streaming.yaml +++ b/relay.pty-output-streaming.yaml @@ -3,31 +3,21 @@ name: pty-output-streaming description: > PTY output streaming improvements for GitHub issue #390. - STATUS: The core feature is ALREADY IMPLEMENTED. The full pipeline exists: - - Rust: src/pty_worker.rs emits `worker_stream` frames (lines 441-444, 836-839) - - Protocol: packages/sdk/src/protocol.ts defines `worker_stream` BrokerEvent - and `WorkerToBroker` types - - SDK: packages/sdk/src/relay.ts handles `worker_stream` events, provides - agent.onOutput() callback with per-agent filtering and unsubscribe - - Tests: packages/sdk/src/__tests__/orchestration-upgrades.test.ts covers - onOutput, per-agent filtering, unsubscribe, and idle-resume on output - - The existing push model works end-to-end: + The core worker_stream pipeline is ALREADY IMPLEMENTED end-to-end: Rust pty_worker → send_frame("worker_stream") → Broker → SDK event handler - → onWorkerOutput global hook + dispatchOutput() → agent.onOutput(callback) + → onWorkerOutput + agent.onOutput() - REMAINING GAP: The Rust side emits worker_stream on EVERY PTY read chunk with - no rate-limiting or buffering. High-frequency output (e.g. streaming compilation - logs) floods the broker with small frames. Adding 100ms batched buffering in - src/pty_worker.rs would reduce frame rate to ~10/sec max while preserving - real-time feel. SDK-level output filtering by stream type (stdout vs stderr) - could also be added as a convenience. + REMAINING GAP: PTY emits worker_stream on every read chunk with no batching. + This workflow adds 100ms rate-limited buffering in Rust and optional stream + filtering in the SDK's onOutput API. + + 2 phases: parallel implementation → verification + fix. swarm: pattern: dag - maxConcurrency: 2 + maxConcurrency: 3 timeoutMs: 3600000 - channel: pty-streaming + channel: wf-pty-streaming idleNudge: nudgeAfterMs: 300000 escalateAfterMs: 300000 @@ -36,285 +26,214 @@ swarm: agents: - name: rust-dev cli: claude - channels: [pty-streaming] - role: > - Rust backend developer. Modifies src/pty_worker.rs to add rate-limited - buffering for worker_stream frame emission. Familiar with the PTY event - loop, tokio async, and the existing send_frame helper. + preset: worker + role: 'Rust backend — adds rate-limited buffering to worker_stream emission in pty_worker.rs' constraints: - model: opus + model: sonnet - name: sdk-dev cli: claude - channels: [pty-streaming] - role: > - TypeScript SDK developer. Adds optional stream filtering to the - agent.onOutput() API in packages/sdk/src/relay.ts. Writes tests in - packages/sdk/src/__tests__/orchestration-upgrades.test.ts. + preset: worker + role: 'TypeScript SDK — adds stream filtering to agent.onOutput() and writes tests' + constraints: + model: sonnet + + - name: fixer + cli: claude + preset: worker + role: 'Fixes type errors and test failures' constraints: model: sonnet workflows: - - name: improve-pty-output-streaming - description: > - Add rate-limited buffering on the Rust side and optional stream filtering - on the SDK side. The core worker_stream pipeline already works end-to-end. + - name: default + description: 'Add rate-limited buffering on Rust side and stream filtering on SDK side.' onError: retry + preflight: - - command: '[ "$(git rev-parse --abbrev-ref HEAD)" != "main" ]' - description: 'Not on main branch — prevents accidental pushes to main' - command: test -f src/pty_worker.rs description: 'Rust PTY worker source exists' - command: test -f packages/sdk/src/relay.ts description: 'SDK relay source exists' - command: grep -q worker_stream src/pty_worker.rs - description: 'worker_stream emission already exists in pty_worker' + description: 'worker_stream emission already exists' - command: grep -q worker_stream packages/sdk/src/protocol.ts description: 'worker_stream protocol type already defined' - - command: git status --porcelain - failIf: non-empty - description: 'Working directory is clean' steps: - # ── Wave 1: Parallel — Rust buffering + SDK filtering ───────────── + # ════════════════════════════════════════════════════════════════════════ + # PHASE 1: Context reads + parallel implementation + # ════════════════════════════════════════════════════════════════════════ + + - name: read-pty-worker + type: deterministic + command: | + echo "=== worker_stream emission site ===" + grep -n -B5 -A10 'worker_stream' src/pty_worker.rs + echo "" + echo "=== buffer declarations area (lines 220-260) ===" + sed -n '220,260p' src/pty_worker.rs + echo "" + echo "=== verification_tick select arm ===" + grep -n -B2 -A10 'verification_tick' src/pty_worker.rs + echo "" + echo "=== imports ===" + head -30 src/pty_worker.rs + captureOutput: true + + - name: read-sdk-relay + type: deterministic + command: | + echo "=== onOutput method ===" + grep -n -B5 -A20 'onOutput' packages/sdk/src/relay.ts + echo "" + echo "=== AgentOutputCallback type ===" + grep -n -B2 -A5 'AgentOutputCallback' packages/sdk/src/relay.ts + echo "" + echo "=== Agent interface output section ===" + grep -n -B2 -A5 'onOutput' packages/sdk/src/relay.ts | head -30 + echo "" + echo "=== dispatchOutput ===" + grep -n -B2 -A15 'dispatchOutput' packages/sdk/src/relay.ts + captureOutput: true + + - name: read-existing-tests + type: deterministic + command: | + echo "=== onOutput test patterns ===" + grep -n -B2 -A15 'onOutput\|worker_stream' packages/sdk/src/__tests__/orchestration-upgrades.test.ts | head -60 + captureOutput: true + + # ── Wave 1: Rust buffering + SDK filtering (parallel) ────────────────── - name: rust-output-buffering - type: agent agent: rust-dev + dependsOn: [read-pty-worker] task: | - Add 100ms rate-limited buffering to worker_stream emission in - src/pty_worker.rs. + Add 100ms rate-limited buffering to worker_stream emission in src/pty_worker.rs. CONTEXT: - The PTY event loop in run_pty_worker() already emits worker_stream - frames at line 441-444: - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": text, - })).await; - - This fires on EVERY PTY read chunk with no batching. High-frequency - output floods the broker with many small frames. - - TASK: - 1. Add an output buffer (String) and a last-flush Instant near the - existing `echo_buffer` and `continuity_buffer` declarations - (around line 229). - - 2. Replace the direct send_frame("worker_stream") call at line 441-444 - with buffer accumulation: - - Append the chunk text to the output buffer - - Only flush (send the frame) when: - a) 100ms have elapsed since last flush, OR - b) buffer exceeds 4096 bytes - - On flush: send the accumulated buffer as one worker_stream frame, - clear the buffer, reset the flush timestamp - - Skip sending if buffer is empty after trim - - 3. Add a flush check on the existing verification_tick interval - (line 234, fires every 200ms) so buffered output is flushed even - during quiet periods. Look for the `_ = verification_tick.tick()` - arm in the select! loop and add a buffer flush there. - - 4. Keep the watchdog late-output emission at line 836-839 as-is - (it already handles the drain-on-exit case). - - CONSTRAINTS: - - Use anyhow::Result, not unwrap() in production paths - - Use tracing macros for debug logging - - Do not add new crate dependencies - - Use std::time::Instant for timestamps (already imported) - - Follow existing code style in the file - - After implementing, run: - cargo build 2>&1 | tail -10 - cargo clippy -- -D warnings 2>&1 | tail -10 - - Do NOT use relay_spawn or add_agent. - Print exactly: RUST_BUFFERING_DONE + {{steps.read-pty-worker.output}} + + Changes: + 1. Add an output buffer (String) and last-flush Instant near existing buffer declarations + 2. Replace direct send_frame("worker_stream") with buffer accumulation + 3. Flush when: 100ms elapsed since last flush OR buffer exceeds 4096 bytes + 4. Add flush check on the verification_tick interval for quiet periods + 5. Keep the watchdog late-output emission as-is + + Constraints: anyhow::Result (no unwrap), tracing macros, no new crate deps, + std::time::Instant (already imported), match existing code style. + + After implementing, run: cargo build 2>&1 | tail -10 + + Write the file to disk. verification: - type: output_contains - value: RUST_BUFFERING_DONE + type: exit_code retries: 2 - name: sdk-stream-filtering - type: agent agent: sdk-dev + dependsOn: [read-sdk-relay, read-existing-tests] task: | - Add optional stream-type filtering to agent.onOutput() in the SDK. + Add optional stream-type filtering to agent.onOutput() in packages/sdk/src/relay.ts. CONTEXT: - packages/sdk/src/relay.ts already has a working onOutput() API - (line 1190-1204) that receives all worker_stream events for an agent. - The worker_stream event includes a `stream` field ("stdout" or - potentially "stderr") but there's no way to filter by stream type. - - The AgentOutputCallback type (line 148) supports two modes: - type AgentOutputCallback = - | ((chunk: string) => void) - | ((data: AgentOutputPayload) => void); - - TASK: - 1. In packages/sdk/src/relay.ts, update the onOutput method signature - to accept an optional filter parameter: - onOutput(callback: AgentOutputCallback, options?: { stream?: string }): () => void - - 2. In the implementation (around line 1190-1204), when options.stream - is provided, only invoke the callback when the event's stream field - matches the filter. - - 3. Add tests in packages/sdk/src/__tests__/orchestration-upgrades.test.ts: - - Test that onOutput with { stream: 'stdout' } only receives stdout - - Test that onOutput without filter receives all streams - - Test that onOutput with { stream: 'stderr' } ignores stdout - - 4. Update the Agent interface (line 180) to include the options param. - - CONSTRAINTS: - - Follow existing code patterns in relay.ts - - Keep backward compatibility — no options = receive all (existing behavior) - - Use type-only imports where appropriate - - Run existing tests to ensure no regressions - - After implementing, run: - npx tsc --noEmit 2>&1 | tail -10 - npx vitest run packages/sdk/src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 - - Do NOT use relay_spawn or add_agent. - Print exactly: SDK_FILTERING_DONE + {{steps.read-sdk-relay.output}} + + EXISTING TEST PATTERNS: + {{steps.read-existing-tests.output}} + + Changes: + 1. Update onOutput signature: onOutput(callback, options?: { stream?: string }): () => void + 2. When options.stream is provided, only invoke callback when event stream field matches + 3. Update Agent interface to include the options param + 4. Add tests in orchestration-upgrades.test.ts: + - onOutput with { stream: 'stdout' } only receives stdout + - onOutput without filter receives all streams + - onOutput with { stream: 'stderr' } ignores stdout + + Keep backward compatibility — no options = receive all. + + After implementing, run: npx tsc --noEmit 2>&1 | tail -10 + + Write all files to disk. verification: - type: output_contains - value: SDK_FILTERING_DONE + type: exit_code retries: 2 - # ── Wave 2: Build gate ─────────────────────────────────────────── + # ════════════════════════════════════════════════════════════════════════ + # PHASE 2: Verification + fix + # ════════════════════════════════════════════════════════════════════════ - - name: build-check + - name: verify-files type: deterministic dependsOn: [rust-output-buffering, sdk-stream-filtering] - command: > - set -o pipefail; - cargo build 2>&1 | tail -10 && - cargo clippy -- -D warnings 2>&1 | tail -10 && - npx tsc --noEmit 2>&1 | tail -10 && - echo "BUILD_OK" || echo "BUILD_FAILED" + command: | + errors=0 + grep -q "output_buffer\|flush" src/pty_worker.rs || { echo "FAIL: pty_worker.rs missing buffering"; errors=$((errors+1)); } + grep -q "stream.*filter\|options.*stream" packages/sdk/src/relay.ts || { echo "FAIL: relay.ts missing stream filtering"; errors=$((errors+1)); } + [ $errors -gt 0 ] && exit 1 + echo "All implementation files verified" captureOutput: true - failOnError: false + failOnError: true - # ── Wave 3: Fix any failures ───────────────────────────────────── + - name: build-check + type: deterministic + dependsOn: [verify-files] + command: | + echo "=== Cargo build ===" + cargo build 2>&1 | tail -10 + cargo_exit=$? + echo "" + echo "=== Clippy ===" + cargo clippy -- -D warnings 2>&1 | tail -10 + clippy_exit=$? + echo "" + echo "=== TypeScript ===" + cd packages/sdk && npx tsc --noEmit 2>&1 | tail -10 + tsc_exit=$? + echo "" + [ $cargo_exit -eq 0 ] && [ $clippy_exit -eq 0 ] && [ $tsc_exit -eq 0 ] && echo "BUILD_OK" || echo "BUILD_FAILED" + [ $cargo_exit -eq 0 ] && [ $clippy_exit -eq 0 ] && [ $tsc_exit -eq 0 ] + captureOutput: true + failOnError: false - - name: fix-build-failures - type: agent - agent: sdk-dev + - name: fix-failures + agent: fixer dependsOn: [build-check] task: | - Build result: {{steps.build-check.output}} - - If it shows BUILD_OK, there is nothing to fix. Output BUILD_ALL_GREEN. + Build results: + {{steps.build-check.output}} - Otherwise, fix any type errors, lint errors, or compilation failures. - The relevant files are: - - src/pty_worker.rs (Rust buffering changes) - - packages/sdk/src/relay.ts (SDK filtering changes) - - packages/sdk/src/__tests__/orchestration-upgrades.test.ts (new tests) + If BUILD_OK, there's nothing to fix. Just confirm all clear. - After fixing, re-run the failing build commands to verify. + Otherwise, fix type errors, lint errors, or compilation failures in: + - src/pty_worker.rs (Rust buffering) + - packages/sdk/src/relay.ts (SDK filtering) + - packages/sdk/src/__tests__/orchestration-upgrades.test.ts (tests) - Do NOT use relay_spawn or add_agent. - Print exactly: BUILD_ALL_GREEN + After fixing, re-run the failing commands to verify. + Write files to disk. verification: - type: output_contains - value: BUILD_ALL_GREEN + type: exit_code retries: 2 - # ── Wave 4: Run tests ──────────────────────────────────────────── - - - name: run-tests + - name: final-tests type: deterministic - dependsOn: [fix-build-failures] - command: > - set -o pipefail; - cargo test 2>&1 | tail -20 && - npx vitest run packages/sdk/src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 && - echo "TESTS_PASSED" || echo "TESTS_FAILED" + dependsOn: [fix-failures] + command: | + set -o pipefail + echo "=== Cargo test ===" + cargo test 2>&1 | tail -20 + echo "" + echo "=== SDK tests ===" + cd packages/sdk && npx vitest run src/__tests__/orchestration-upgrades.test.ts 2>&1 | tail -20 captureOutput: true failOnError: true - # ── Wave 5: Commit ─────────────────────────────────────────────── - - - name: create-branch - type: deterministic - dependsOn: [run-tests] - command: > - git rev-parse --verify feature/390-pty-output-streaming >/dev/null 2>&1 && - git checkout feature/390-pty-output-streaming || - git checkout -b feature/390-pty-output-streaming && - echo "BRANCH_READY" - captureOutput: true - failOnError: true - - - name: commit-changes - type: deterministic - dependsOn: [create-branch] - command: > - set -e; - BRANCH="$(git rev-parse --abbrev-ref HEAD)"; - if [ "$BRANCH" = "main" ]; then echo "ERROR: refusing to commit on main" && exit 1; fi; - git add src/pty_worker.rs packages/sdk/src/relay.ts packages/sdk/src/__tests__/orchestration-upgrades.test.ts && - git commit -m "feat: add 100ms output buffering and SDK stream filtering (#390)" && - git push -u origin HEAD 2>&1 && - echo "COMMIT_DONE" || echo "COMMIT_FAILED" - captureOutput: true - failOnError: false - - # ── Wave 6: Summary ────────────────────────────────────────────── - - - name: summary - type: agent - agent: sdk-dev - dependsOn: [commit-changes] - task: | - The PTY output streaming improvement workflow has completed. - - Rust buffering: {{steps.rust-output-buffering.output}} - SDK filtering: {{steps.sdk-stream-filtering.output}} - Build: {{steps.build-check.output}} - Tests: {{steps.run-tests.output}} - Commit: {{steps.commit-changes.output}} - - Summarize: - 1. What was changed and why - 2. What already existed (the full worker_stream pipeline) - 3. What was added (buffering + filtering) - 4. Test results - - Do NOT use relay_spawn or add_agent. - Print exactly: WORKFLOW_SUMMARY_DONE - verification: - type: output_contains - value: WORKFLOW_SUMMARY_DONE - -coordination: - barriers: - - name: implementation-complete - waitFor: [rust-output-buffering, sdk-stream-filtering] - timeoutMs: 1800000 - - name: builds-green - waitFor: [build-check] - timeoutMs: 600000 - -state: - backend: memory - ttlMs: 86400000 - namespace: pty-streaming - errorHandling: strategy: retry maxRetries: 2 retryDelayMs: 5000 - notifyChannel: pty-streaming - -trajectories: - enabled: true - reflectOnBarriers: true - autoDecisions: true + notifyChannel: wf-pty-streaming From 8a5f66096d960090040e4a1e46d7c208620c4baf Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 08:57:00 +0100 Subject: [PATCH 06/14] pty streaming --- .../traj_1774425142647_96c5a9e2.json | 292 ++++++++++++++++++ .../__tests__/orchestration-upgrades.test.ts | 73 +++++ packages/sdk/src/relay.ts | 11 +- src/pty_worker.rs | 33 +- 4 files changed, 402 insertions(+), 7 deletions(-) create mode 100644 .trajectories/completed/traj_1774425142647_96c5a9e2.json diff --git a/.trajectories/completed/traj_1774425142647_96c5a9e2.json b/.trajectories/completed/traj_1774425142647_96c5a9e2.json new file mode 100644 index 000000000..3f8e0d254 --- /dev/null +++ b/.trajectories/completed/traj_1774425142647_96c5a9e2.json @@ -0,0 +1,292 @@ +{ + "id": "traj_1774425142647_96c5a9e2", + "version": 1, + "task": { + "title": "default", + "source": { + "system": "workflow-runner", + "id": "c900c9e57bd8b4d32110daaf" + } + }, + "status": "completed", + "startedAt": "2026-03-25T07:52:22.647Z", + "agents": [ + { + "name": "orchestrator", + "role": "workflow-runner", + "joinedAt": "2026-03-25T07:52:22.647Z" + }, + { + "name": "rust-dev", + "role": "specialist", + "joinedAt": "2026-03-25T07:52:29.100Z" + }, + { + "name": "sdk-dev", + "role": "specialist", + "joinedAt": "2026-03-25T07:52:29.100Z" + }, + { + "name": "fixer", + "role": "specialist", + "joinedAt": "2026-03-25T07:54:21.246Z" + } + ], + "chapters": [ + { + "id": "ch_c5384845", + "title": "Planning", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:22.647Z", + "events": [ + { + "ts": 1774425142648, + "type": "note", + "content": "Purpose: PTY output streaming improvements for GitHub issue #390.\nThe core worker_stream pipeline is ALREADY IMPLEMENTED end-to-end:\n Rust pty_worker → send_frame(\"worker_stream\") → Broker → SDK event handler\n → onWorkerOutput + agent.onOutput()\n\nREMAINING GAP: PTY emits worker_stream on every read chunk with no batching. This workflow adds 100ms rate-limited buffering in Rust and optional stream filtering in the SDK's onOutput API.\n2 phases: parallel implementation → verification + fix." + }, + { + "ts": 1774425142648, + "type": "note", + "content": "Approach: 9-step dag workflow — Parsed 9 steps, 3 parallel tracks, 6 dependent steps, DAG validated, no cycles" + } + ], + "endedAt": "2026-03-25T07:52:26.120Z" + }, + { + "id": "ch_73f88d81", + "title": "Execution: read-pty-worker, read-sdk-relay, read-existing-tests", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:26.120Z", + "events": [], + "endedAt": "2026-03-25T07:52:29.094Z" + }, + { + "id": "ch_f4f09c51", + "title": "Convergence: read-pty-worker + read-sdk-relay + read-existing-tests", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:29.094Z", + "events": [ + { + "ts": 1774425149098, + "type": "reflection", + "content": "read-pty-worker + read-sdk-relay + read-existing-tests resolved. 3/3 steps completed. All steps completed on first attempt. Unblocking: rust-output-buffering, sdk-stream-filtering.", + "significance": "high", + "raw": { + "confidence": 0.75, + "focalPoints": [ + "read-pty-worker: completed", + "read-sdk-relay: completed", + "read-existing-tests: completed" + ] + } + } + ], + "endedAt": "2026-03-25T07:52:29.099Z" + }, + { + "id": "ch_f051fec1", + "title": "Execution: rust-output-buffering, sdk-stream-filtering", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:52:29.099Z", + "events": [], + "endedAt": "2026-03-25T07:52:29.100Z" + }, + { + "id": "ch_3f8ca847", + "title": "Execution: rust-output-buffering", + "agentName": "rust-dev", + "startedAt": "2026-03-25T07:52:29.100Z", + "events": [ + { + "ts": 1774425149100, + "type": "note", + "content": "\"rust-output-buffering\": Add 100ms rate-limited buffering to worker_stream emission in src/pty_worker.rs", + "raw": { + "agent": "rust-dev" + } + } + ], + "endedAt": "2026-03-25T07:52:29.100Z" + }, + { + "id": "ch_fbc1c17c", + "title": "Execution: sdk-stream-filtering", + "agentName": "sdk-dev", + "startedAt": "2026-03-25T07:52:29.100Z", + "events": [ + { + "ts": 1774425149100, + "type": "note", + "content": "\"sdk-stream-filtering\": Add optional stream-type filtering to agent.onOutput() in packages/sdk/src/relay.ts", + "raw": { + "agent": "sdk-dev" + } + }, + { + "ts": 1774425235603, + "type": "completion-evidence", + "content": "\"sdk-stream-filtering\" verification-based completion — Verification passed (4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0; signals=0, No errors. All changes are complete., Verification passed, **[sdk-stream-filtering] Output:**; channel=**[sdk-stream-filtering] Output:**\n```\nNo errors. All changes are complete.\n**Summary of changes:**\n**`packages/sdk/src/relay.ts`:**\n1. `Agent` interface — upda; files=modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts, modified:packages/sdk/src/relay.ts, modified:src/pty_worker.rs, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/invoked.timestamp, modified:target/debug/.fingerprint/agent-relay-broker-c1a62271656ca6f7/dep-lib-relay_broker; exit=0)", + "significance": "medium", + "raw": { + "stepName": "sdk-stream-filtering", + "completionMode": "verification", + "reason": "Verification passed", + "evidence": { + "summary": "4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0", + "signals": [ + "0", + "No errors. All changes are complete.", + "Verification passed", + "**[sdk-stream-filtering] Output:**" + ], + "channelPosts": [ + "**[sdk-stream-filtering] Output:**\n```\nNo errors. All changes are complete.\n**Summary of changes:**\n**`packages/sdk/src/relay.ts`:**\n1. `Agent` interface — upda" + ], + "files": [ + "modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts", + "modified:packages/sdk/src/relay.ts", + "modified:src/pty_worker.rs", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/invoked.timestamp", + "modified:target/debug/.fingerprint/agent-relay-broker-c1a62271656ca6f7/dep-lib-relay_broker" + ], + "exitCode": 0 + } + } + }, + { + "ts": 1774425235603, + "type": "finding", + "content": "\"sdk-stream-filtering\" completed → - `{ stream: 'stderr' }` filter ignores stdout events", + "significance": "medium" + }, + { + "ts": 1774425250796, + "type": "completion-evidence", + "content": "\"rust-output-buffering\" verification-based completion — Verification passed (4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0; signals=0, Build succeeds. Here's a summary of what was done:, Verification passed, **[rust-output-buffering] Output:**; channel=**[rust-output-buffering] Output:**\n```\nBuild succeeds. Here's a summary of what was done:\n**Changes to `src/pty_worker.rs`:**\n1. **Added buffer declarations** ; files=modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts, modified:packages/sdk/src/relay.ts, modified:src/pty_worker.rs, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker.json, modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/dep-bin-agent-relay-broker; exit=0)", + "significance": "medium", + "raw": { + "stepName": "rust-output-buffering", + "completionMode": "verification", + "reason": "Verification passed", + "evidence": { + "summary": "4 signal(s), 1 relevant channel post(s), 6 file change(s), exit=0", + "signals": [ + "0", + "Build succeeds. Here's a summary of what was done:", + "Verification passed", + "**[rust-output-buffering] Output:**" + ], + "channelPosts": [ + "**[rust-output-buffering] Output:**\n```\nBuild succeeds. Here's a summary of what was done:\n**Changes to `src/pty_worker.rs`:**\n1. **Added buffer declarations** " + ], + "files": [ + "modified:packages/sdk/src/__tests__/orchestration-upgrades.test.ts", + "modified:packages/sdk/src/relay.ts", + "modified:src/pty_worker.rs", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/bin-agent-relay-broker.json", + "modified:target/debug/.fingerprint/agent-relay-broker-aba1cde5556acf05/dep-bin-agent-relay-broker" + ], + "exitCode": 0 + } + } + }, + { + "ts": 1774425250796, + "type": "finding", + "content": "\"rust-output-buffering\" completed → Build succeeds. Here's a summary of what was done:\n\n**Changes to `src/pty_worker.rs`:**\n\n1. **Added buffer declarations*", + "significance": "medium" + } + ], + "endedAt": "2026-03-25T07:54:10.799Z" + }, + { + "id": "ch_5d1c6959", + "title": "Convergence: rust-output-buffering + sdk-stream-filtering", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:54:10.799Z", + "events": [ + { + "ts": 1774425250799, + "type": "reflection", + "content": "rust-output-buffering + sdk-stream-filtering resolved. 2/2 steps completed. All steps completed on first attempt. Unblocking: verify-files.", + "significance": "high", + "raw": { + "confidence": 1, + "focalPoints": [ + "rust-output-buffering: completed", + "sdk-stream-filtering: completed" + ] + } + } + ], + "endedAt": "2026-03-25T07:54:21.246Z" + }, + { + "id": "ch_6985dbcf", + "title": "Execution: fix-failures", + "agentName": "fixer", + "startedAt": "2026-03-25T07:54:21.246Z", + "events": [ + { + "ts": 1774425261246, + "type": "note", + "content": "\"fix-failures\": Build results:", + "raw": { + "agent": "fixer" + } + }, + { + "ts": 1774425268713, + "type": "completion-evidence", + "content": "\"fix-failures\" verification-based completion — Verification passed (2 signal(s), exit=0; signals=0, Verification passed; exit=0)", + "significance": "medium", + "raw": { + "stepName": "fix-failures", + "completionMode": "verification", + "reason": "Verification passed", + "evidence": { + "summary": "2 signal(s), exit=0", + "signals": [ + "0", + "Verification passed" + ], + "exitCode": 0 + } + } + }, + { + "ts": 1774425268713, + "type": "finding", + "content": "\"fix-failures\" completed → All clear. Build is OK — Cargo, Clippy, and TypeScript all passed with no errors.", + "significance": "medium" + } + ], + "endedAt": "2026-03-25T07:54:44.166Z" + }, + { + "id": "ch_6d0e5bbb", + "title": "Retrospective", + "agentName": "orchestrator", + "startedAt": "2026-03-25T07:54:44.166Z", + "events": [ + { + "ts": 1774425284166, + "type": "reflection", + "content": "All 9 steps completed in 2min. (completed in 2 minutes)", + "significance": "high" + } + ], + "endedAt": "2026-03-25T07:54:44.166Z" + } + ], + "completedAt": "2026-03-25T07:54:44.166Z", + "retrospective": { + "summary": "All 9 steps completed in 2min.", + "approach": "dag workflow (3 agents)", + "confidence": 0.8333333333333334, + "learnings": [], + "challenges": [] + } +} \ No newline at end of file diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index 5f1bb1a7b..e0a0052cb 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -1107,4 +1107,77 @@ describe('Agent.onOutput', () => { await relay.shutdown(); } }); + + it('onOutput with { stream: "stdout" } only receives stdout events', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'stream-filter-agent', + cli: 'claude', + channels: ['general'], + }); + + const chunks: string[] = []; + agent.onOutput((chunk: string) => chunks.push(chunk), { stream: 'stdout' }); + + emit({ kind: 'worker_stream', name: 'stream-filter-agent', stream: 'stdout', chunk: 'out1' }); + emit({ kind: 'worker_stream', name: 'stream-filter-agent', stream: 'stderr', chunk: 'err1' }); + emit({ kind: 'worker_stream', name: 'stream-filter-agent', stream: 'stdout', chunk: 'out2' }); + + expect(chunks).toEqual(['out1', 'out2']); + } finally { + await relay.shutdown(); + } + }); + + it('onOutput without filter receives all streams', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'all-streams-agent', + cli: 'claude', + channels: ['general'], + }); + + const chunks: string[] = []; + agent.onOutput((chunk: string) => chunks.push(chunk)); + + emit({ kind: 'worker_stream', name: 'all-streams-agent', stream: 'stdout', chunk: 'out' }); + emit({ kind: 'worker_stream', name: 'all-streams-agent', stream: 'stderr', chunk: 'err' }); + + expect(chunks).toEqual(['out', 'err']); + } finally { + await relay.shutdown(); + } + }); + + it('onOutput with { stream: "stderr" } ignores stdout events', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'stderr-filter-agent', + cli: 'claude', + channels: ['general'], + }); + + const chunks: string[] = []; + agent.onOutput((chunk: string) => chunks.push(chunk), { stream: 'stderr' }); + + emit({ kind: 'worker_stream', name: 'stderr-filter-agent', stream: 'stdout', chunk: 'ignored' }); + emit({ kind: 'worker_stream', name: 'stderr-filter-agent', stream: 'stderr', chunk: 'captured' }); + + expect(chunks).toEqual(['captured']); + } finally { + await relay.shutdown(); + } + }); }); diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 7bf6f947e..469b66366 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -181,8 +181,10 @@ export interface Agent { }): Promise; subscribe(channels: string[]): Promise; unsubscribe(channels: string[]): Promise; - /** Register a callback for PTY output from this agent. Returns an unsubscribe function. */ - onOutput(callback: AgentOutputCallback): () => void; + /** Register a callback for PTY output from this agent. Returns an unsubscribe function. + * @param options.stream — if provided, only invoke callback when the event stream matches (e.g. 'stdout', 'stderr') + */ + onOutput(callback: AgentOutputCallback, options?: { stream?: string }): () => void; } export interface HumanHandle { @@ -239,6 +241,7 @@ export interface AgentRelayOptions { type OutputListener = { callback: AgentOutputCallback; mode: 'chunk' | 'structured'; + stream?: string; }; type InternalAgent = Agent & { @@ -897,6 +900,7 @@ export class AgentRelay { const listeners = this.outputListeners.get(name); if (!listeners) return; for (const listener of listeners) { + if (listener.stream !== undefined && listener.stream !== stream) continue; if (listener.mode === 'structured') { (listener.callback as (data: AgentOutputPayload) => void)({ stream, chunk }); } else { @@ -1305,7 +1309,7 @@ export class AgentRelay { async unsubscribe(channelsToRemove: string[]) { await relay.unsubscribe({ agent: name, channels: channelsToRemove }); }, - onOutput(callback: AgentOutputCallback): () => void { + onOutput(callback: AgentOutputCallback, options?: { stream?: string }): () => void { let listeners = relay.outputListeners.get(name); if (!listeners) { listeners = new Set(); @@ -1314,6 +1318,7 @@ export class AgentRelay { const listener: OutputListener = { callback, mode: relay.inferOutputMode(callback), + stream: options?.stream, }; listeners.add(listener); return () => { diff --git a/src/pty_worker.rs b/src/pty_worker.rs index fd9eed991..d7046c28a 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -254,6 +254,12 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Bounded to avoid unbounded memory growth; continuity blocks are small. let mut continuity_buffer = String::new(); const CONTINUITY_BUFFER_MAX: usize = 4096; + // Rate-limited buffering for worker_stream emissions. + // Chunks are accumulated and flushed at most every 100ms or when buffer exceeds threshold. + let mut stream_buffer = String::new(); + let mut stream_buffer_last_flush = Instant::now(); + const STREAM_BUFFER_MAX: usize = 4096; + const STREAM_FLUSH_INTERVAL: Duration = Duration::from_millis(100); let mut verification_tick = tokio::time::interval(Duration::from_millis(200)); verification_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -490,10 +496,17 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { running = false; } - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": text, - })).await; + stream_buffer.push_str(&text); + if stream_buffer.len() >= STREAM_BUFFER_MAX + || stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL + { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + stream_buffer_last_flush = Instant::now(); + } pty_auto.update_auto_suggestion(&text); pty_auto.last_output_time = Instant::now(); @@ -856,6 +869,18 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } } + // Flush any buffered stream output during quiet periods. + if !stream_buffer.is_empty() + && stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL + { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + stream_buffer_last_flush = Instant::now(); + } + } // --- Auto-enter for stuck agents --- From 73c02c472bb4f4480e34d3e8a6b2503c1d2b3a43 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 09:29:25 +0100 Subject: [PATCH 07/14] fix: flush stream_buffer on exit and before watchdog late output Addresses two Devin review issues: 1. Flush stream_buffer after main loop exits so final output (up to 4096 bytes) is not silently dropped before worker_exited is sent. 2. Flush stream_buffer before watchdog drains late PTY output to preserve correct output ordering. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pty_worker.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/pty_worker.rs b/src/pty_worker.rs index d7046c28a..5751bf3de 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -915,6 +915,16 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // messages from CLIs that exit immediately (e.g. codex MCP // failure). Without this, the output is lost in the race // between the reader thread and the watchdog. + // Flush any buffered stream output before sending late output + // to preserve ordering. + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + stream_buffer_last_flush = Instant::now(); + } let mut late_output = String::new(); while let Ok(chunk) = pty_rx.try_recv() { let text = String::from_utf8_lossy(&chunk).to_string(); @@ -977,6 +987,15 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } } + // Flush any remaining buffered stream output before signaling exit. + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + } + if child_exit_detected { let _ = send_frame( &out_tx, From 6f69373b2009b92c2f8d359da2b7161ae399146e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 25 Mar 2026 08:29:58 +0000 Subject: [PATCH 08/14] style: auto-format Rust code with cargo fmt --- src/pty_worker.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 5751bf3de..3e8029d77 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -990,10 +990,16 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Flush any remaining buffered stream output before signaling exit. if !stream_buffer.is_empty() { let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": chunk, - })).await; + let _ = send_frame( + &out_tx, + "worker_stream", + None, + json!({ + "stream": "stdout", + "chunk": chunk, + }), + ) + .await; } if child_exit_detected { From ebaa745ffb4d3d8da781518ca3071fffe65a378b Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 09:48:24 +0100 Subject: [PATCH 09/14] fix: flush stream_buffer before agent_exit on PTY close Flush buffered stream output before sending agent_exit in the PTY-closed branch, matching the watchdog branch behavior. Without this, up to 4096 bytes of output could arrive after the exit event. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pty_worker.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 3e8029d77..82f87b3a3 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -680,8 +680,17 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } } None => { - // PTY reader closed — child likely exited. Emit - // agent_exit with any echo_buffer tail so the + // PTY reader closed — child likely exited. Flush + // any buffered stream output before sending + // agent_exit to preserve output ordering. + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + } + // Emit agent_exit with any echo_buffer tail so the // dashboard can surface the CLI's last output. let clean = strip_ansi(&echo_buffer); let trimmed = if clean.len() > 2000 { From 142ae9483e96b942b8488abfc7d679b5a32ccba0 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 10:19:10 +0100 Subject: [PATCH 10/14] refactor: extract flush_stream_buffer macro to eliminate duplication The identical stream_buffer flush pattern was copy-pasted at 5 sites, causing bugs where fixes applied to one site didn't propagate to others. Extract a flush_stream_buffer!() macro used at all 6 flush points. Also moves /exit detection after the buffer push+flush so agent_exit is always sent after all buffered output. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pty_worker.rs | 72 +++++++++++++++++------------------------------ 1 file changed, 26 insertions(+), 46 deletions(-) diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 82f87b3a3..20699fcdc 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -260,6 +260,20 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { let mut stream_buffer_last_flush = Instant::now(); const STREAM_BUFFER_MAX: usize = 4096; const STREAM_FLUSH_INTERVAL: Duration = Duration::from_millis(100); + + /// Flush `stream_buffer` via a `worker_stream` frame if non-empty. + macro_rules! flush_stream_buffer { + () => { + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + stream_buffer_last_flush = Instant::now(); + } + }; + } let mut verification_tick = tokio::time::interval(Duration::from_millis(200)); verification_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -483,6 +497,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Detect /exit command in agent output and trigger graceful shutdown. // Skip detection while echo verifications are pending to avoid // false-positives from injected relay messages containing "/exit". + stream_buffer.push_str(&text); + if stream_buffer.len() >= STREAM_BUFFER_MAX + || stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL + { + flush_stream_buffer!(); + } + if pending_verifications.is_empty() && clean_text.lines().any(|line| line.trim() == "/exit") { @@ -490,24 +511,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { target = "agent_relay::worker::pty", "agent issued /exit — shutting down" ); + flush_stream_buffer!(); let _ = send_frame(&out_tx, "agent_exit", None, json!({ "reason": "agent_requested", })).await; running = false; } - stream_buffer.push_str(&text); - if stream_buffer.len() >= STREAM_BUFFER_MAX - || stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL - { - let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": chunk, - })).await; - stream_buffer_last_flush = Instant::now(); - } - pty_auto.update_auto_suggestion(&text); pty_auto.last_output_time = Instant::now(); pty_auto.reset_idle_on_output(); @@ -683,13 +693,7 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // PTY reader closed — child likely exited. Flush // any buffered stream output before sending // agent_exit to preserve output ordering. - if !stream_buffer.is_empty() { - let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": chunk, - })).await; - } + flush_stream_buffer!(); // Emit agent_exit with any echo_buffer tail so the // dashboard can surface the CLI's last output. let clean = strip_ansi(&echo_buffer); @@ -882,12 +886,7 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { if !stream_buffer.is_empty() && stream_buffer_last_flush.elapsed() >= STREAM_FLUSH_INTERVAL { - let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": chunk, - })).await; - stream_buffer_last_flush = Instant::now(); + flush_stream_buffer!(); } } @@ -926,14 +925,7 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // between the reader thread and the watchdog. // Flush any buffered stream output before sending late output // to preserve ordering. - if !stream_buffer.is_empty() { - let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": chunk, - })).await; - stream_buffer_last_flush = Instant::now(); - } + flush_stream_buffer!(); let mut late_output = String::new(); while let Ok(chunk) = pty_rx.try_recv() { let text = String::from_utf8_lossy(&chunk).to_string(); @@ -997,19 +989,7 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } // Flush any remaining buffered stream output before signaling exit. - if !stream_buffer.is_empty() { - let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame( - &out_tx, - "worker_stream", - None, - json!({ - "stream": "stdout", - "chunk": chunk, - }), - ) - .await; - } + flush_stream_buffer!(); if child_exit_detected { let _ = send_frame( From 54e299e326cce79f35f1f88e30f08c0664f41e95 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 10:22:02 +0100 Subject: [PATCH 11/14] fix: add explicit mode option to onOutput, avoid toString() fragility inferOutputMode uses callback.toString() to detect structured vs chunk mode, which breaks with minifiers. Add an explicit `mode` option to onOutput({ mode: 'structured' }) that bypasses the heuristic entirely. The toString() fallback is preserved for backwards compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../__tests__/orchestration-upgrades.test.ts | 24 +++++++++++++++++++ packages/sdk/src/relay.ts | 7 +++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts index e0a0052cb..4d6493a18 100644 --- a/packages/sdk/src/__tests__/orchestration-upgrades.test.ts +++ b/packages/sdk/src/__tests__/orchestration-upgrades.test.ts @@ -1180,4 +1180,28 @@ describe('Agent.onOutput', () => { await relay.shutdown(); } }); + + it('onOutput with explicit mode: "structured" receives { stream, chunk } objects', async () => { + const { client, emit } = createMockFacadeClient(); + vi.spyOn(AgentRelayClient, 'start').mockResolvedValue(client); + + const relay = new AgentRelay(); + try { + const agent = await relay.spawnPty({ + name: 'explicit-mode-agent', + cli: 'claude', + channels: ['general'], + }); + + const payloads: Array<{ stream: string; chunk: string }> = []; + // Use a plain (chunk) => ... signature but force structured mode via options + agent.onOutput(((data: { stream: string; chunk: string }) => payloads.push(data)) as any, { mode: 'structured' }); + + emit({ kind: 'worker_stream', name: 'explicit-mode-agent', stream: 'stdout', chunk: 'hello' }); + + expect(payloads).toEqual([{ stream: 'stdout', chunk: 'hello' }]); + } finally { + await relay.shutdown(); + } + }); }); diff --git a/packages/sdk/src/relay.ts b/packages/sdk/src/relay.ts index 469b66366..bb90041db 100644 --- a/packages/sdk/src/relay.ts +++ b/packages/sdk/src/relay.ts @@ -183,8 +183,9 @@ export interface Agent { unsubscribe(channels: string[]): Promise; /** Register a callback for PTY output from this agent. Returns an unsubscribe function. * @param options.stream — if provided, only invoke callback when the event stream matches (e.g. 'stdout', 'stderr') + * @param options.mode — 'chunk' for raw string callbacks, 'structured' for { stream, chunk } callbacks. Auto-detected if omitted. */ - onOutput(callback: AgentOutputCallback, options?: { stream?: string }): () => void; + onOutput(callback: AgentOutputCallback, options?: { stream?: string; mode?: 'chunk' | 'structured' }): () => void; } export interface HumanHandle { @@ -1309,7 +1310,7 @@ export class AgentRelay { async unsubscribe(channelsToRemove: string[]) { await relay.unsubscribe({ agent: name, channels: channelsToRemove }); }, - onOutput(callback: AgentOutputCallback, options?: { stream?: string }): () => void { + onOutput(callback: AgentOutputCallback, options?: { stream?: string; mode?: 'chunk' | 'structured' }): () => void { let listeners = relay.outputListeners.get(name); if (!listeners) { listeners = new Set(); @@ -1317,7 +1318,7 @@ export class AgentRelay { } const listener: OutputListener = { callback, - mode: relay.inferOutputMode(callback), + mode: options?.mode ?? relay.inferOutputMode(callback), stream: options?.stream, }; listeners.add(listener); From 80320ff17e665a87bb8d1b68955a3cc5821be6b1 Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 10:24:19 +0100 Subject: [PATCH 12/14] fix: avoid unused assignment warning in post-loop stream flush The flush_stream_buffer! macro sets stream_buffer_last_flush which is never read after the main loop. Inline the flush at the post-loop site to avoid the unused-assignment warning (treated as error in CI). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pty_worker.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 20699fcdc..902717754 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -989,7 +989,13 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { } // Flush any remaining buffered stream output before signaling exit. - flush_stream_buffer!(); + if !stream_buffer.is_empty() { + let chunk = std::mem::take(&mut stream_buffer); + let _ = send_frame(&out_tx, "worker_stream", None, json!({ + "stream": "stdout", + "chunk": chunk, + })).await; + } if child_exit_detected { let _ = send_frame( From 2a1d5140484d0ffe8b566f1505291de02ae214c0 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 25 Mar 2026 09:24:37 +0000 Subject: [PATCH 13/14] style: auto-format Rust code with cargo fmt --- src/pty_worker.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/pty_worker.rs b/src/pty_worker.rs index 902717754..fd119ea01 100644 --- a/src/pty_worker.rs +++ b/src/pty_worker.rs @@ -991,10 +991,16 @@ pub(crate) async fn run_pty_worker(cmd: PtyCommand) -> Result<()> { // Flush any remaining buffered stream output before signaling exit. if !stream_buffer.is_empty() { let chunk = std::mem::take(&mut stream_buffer); - let _ = send_frame(&out_tx, "worker_stream", None, json!({ - "stream": "stdout", - "chunk": chunk, - })).await; + let _ = send_frame( + &out_tx, + "worker_stream", + None, + json!({ + "stream": "stdout", + "chunk": chunk, + }), + ) + .await; } if child_exit_detected { From 4bec9edc1f7980de74358eb49cbbc14607814dcc Mon Sep 17 00:00:00 2001 From: Khaliq Date: Wed, 25 Mar 2026 10:29:01 +0100 Subject: [PATCH 14/14] chore: record PTY output streaming trajectory Co-Authored-By: Claude Opus 4.6 (1M context) --- .../active/traj_1773148075777_a710fb1b.json | 48 ------ .../active/traj_1773150874252_08ec23ea.json | 53 ------- .../active/traj_1773678087586_feff434a.json | 67 --------- .trajectories/active/traj_u6bndffr0b8g.json | 137 ------------------ .../completed/2026-03/traj_1qnnaojcl0w6.json | 117 +++++++++++++++ .../completed/2026-03/traj_1qnnaojcl0w6.md | 53 +++++++ .../2026-03}/traj_ni1xbsaa03bv.json | 14 +- .../completed/2026-03/traj_ni1xbsaa03bv.md | 23 +++ .../completed/2026-03/traj_u6bndffr0b8g.json | 17 ++- .../completed/2026-03/traj_u6bndffr0b8g.md | 14 +- .trajectories/index.json | 21 ++- 11 files changed, 234 insertions(+), 330 deletions(-) delete mode 100644 .trajectories/active/traj_1773148075777_a710fb1b.json delete mode 100644 .trajectories/active/traj_1773150874252_08ec23ea.json delete mode 100644 .trajectories/active/traj_1773678087586_feff434a.json delete mode 100644 .trajectories/active/traj_u6bndffr0b8g.json create mode 100644 .trajectories/completed/2026-03/traj_1qnnaojcl0w6.json create mode 100644 .trajectories/completed/2026-03/traj_1qnnaojcl0w6.md rename .trajectories/{active => completed/2026-03}/traj_ni1xbsaa03bv.json (84%) create mode 100644 .trajectories/completed/2026-03/traj_ni1xbsaa03bv.md diff --git a/.trajectories/active/traj_1773148075777_a710fb1b.json b/.trajectories/active/traj_1773148075777_a710fb1b.json deleted file mode 100644 index e1d957e53..000000000 --- a/.trajectories/active/traj_1773148075777_a710fb1b.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "id": "traj_1773148075777_a710fb1b", - "version": 1, - "task": { - "title": "test-force-release run #e2a54a64", - "source": { - "system": "workflow-runner", - "id": "e2a54a647d51b01453059568" - } - }, - "status": "active", - "startedAt": "2026-03-10T13:07:55.777Z", - "agents": [ - { - "name": "orchestrator", - "role": "workflow-runner", - "joinedAt": "2026-03-10T13:07:55.777Z" - }, - { - "name": "idle-worker", - "role": "specialist", - "joinedAt": "2026-03-10T13:07:59.239Z" - } - ], - "chapters": [ - { - "id": "ch_310b1c03", - "title": "Planning", - "agentName": "orchestrator", - "startedAt": "2026-03-10T13:07:55.777Z", - "events": [ - { - "ts": 1773148075777, - "type": "note", - "content": "Approach: 1-step sequential workflow — Parsed 1 steps, DAG validated, no cycles" - }, - { - "ts": 1773148079242, - "type": "note", - "content": "\"idle-step\": Just sit here", - "raw": { - "agent": "idle-worker" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/.trajectories/active/traj_1773150874252_08ec23ea.json b/.trajectories/active/traj_1773150874252_08ec23ea.json deleted file mode 100644 index 7995c42df..000000000 --- a/.trajectories/active/traj_1773150874252_08ec23ea.json +++ /dev/null @@ -1,53 +0,0 @@ -{ - "id": "traj_1773150874252_08ec23ea", - "version": 1, - "task": { - "title": "test run #ebaecc98", - "source": { - "system": "workflow-runner", - "id": "ebaecc98b1200ac4d1eaf96c" - } - }, - "status": "active", - "startedAt": "2026-03-10T13:54:34.252Z", - "agents": [ - { - "name": "orchestrator", - "role": "workflow-runner", - "joinedAt": "2026-03-10T13:54:34.252Z" - }, - { - "name": "coder", - "role": "specialist", - "joinedAt": "2026-03-10T13:54:38.105Z" - } - ], - "chapters": [ - { - "id": "ch_6a7dbabc", - "title": "Planning", - "agentName": "orchestrator", - "startedAt": "2026-03-10T13:54:34.252Z", - "events": [ - { - "ts": 1773150874252, - "type": "note", - "content": "Purpose: Simple test to verify PTY output streaming works end-to-end" - }, - { - "ts": 1773150874252, - "type": "note", - "content": "Approach: 1-step sequential workflow — Parsed 1 steps, DAG validated, no cycles" - }, - { - "ts": 1773150878108, - "type": "note", - "content": "\"generate-code\": Create a file at /tmp/pty-stream-test.ts with a simple TypeScript", - "raw": { - "agent": "coder" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/.trajectories/active/traj_1773678087586_feff434a.json b/.trajectories/active/traj_1773678087586_feff434a.json deleted file mode 100644 index 77d501be6..000000000 --- a/.trajectories/active/traj_1773678087586_feff434a.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "id": "traj_1773678087586_feff434a", - "version": 1, - "task": { - "title": "add-start-from-workflow", - "source": { - "system": "workflow-runner", - "id": "bb9820a0598047f101ef3fc6" - } - }, - "status": "active", - "startedAt": "2026-03-16T16:21:27.586Z", - "agents": [ - { - "name": "orchestrator", - "role": "workflow-runner", - "joinedAt": "2026-03-16T16:21:27.586Z" - }, - { - "name": "droid-impl", - "role": "specialist", - "joinedAt": "2026-03-16T16:21:31.370Z" - }, - { - "name": "claude-review", - "role": "reviewer", - "joinedAt": "2026-03-16T16:21:31.372Z" - } - ], - "chapters": [ - { - "id": "ch_2384168d", - "title": "Planning", - "agentName": "orchestrator", - "startedAt": "2026-03-16T16:21:27.586Z", - "events": [ - { - "ts": 1773678087586, - "type": "note", - "content": "Purpose: Add startFrom feature to workflow runner" - }, - { - "ts": 1773678087586, - "type": "note", - "content": "Approach: 2-step dag workflow — Parsed 2 steps, 1 dependent steps, DAG validated, no cycles" - } - ], - "endedAt": "2026-03-16T16:21:31.373Z" - }, - { - "id": "ch_d1ab31e8", - "title": "Execution: implement", - "agentName": "droid-impl", - "startedAt": "2026-03-16T16:21:31.373Z", - "events": [ - { - "ts": 1773678091373, - "type": "note", - "content": "\"implement\": Add a startFrom feature to the workflow runner in this repo", - "raw": { - "agent": "droid-impl" - } - } - ] - } - ] -} \ No newline at end of file diff --git a/.trajectories/active/traj_u6bndffr0b8g.json b/.trajectories/active/traj_u6bndffr0b8g.json deleted file mode 100644 index 609f1ddd0..000000000 --- a/.trajectories/active/traj_u6bndffr0b8g.json +++ /dev/null @@ -1,137 +0,0 @@ -{ - "id": "traj_u6bndffr0b8g", - "version": 1, - "task": { - "title": "Code review for assigned workflow changes" - }, - "status": "active", - "startedAt": "2026-03-12T08:38:19.960Z", - "agents": [ - { - "name": "default", - "role": "lead", - "joinedAt": "2026-03-19T21:44:38.430Z" - } - ], - "chapters": [ - { - "id": "chap_7f65y050m95v", - "title": "Work", - "agentName": "default", - "startedAt": "2026-03-19T21:44:38.430Z", - "events": [ - { - "ts": 1773956678431, - "type": "decision", - "content": "Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace: Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace", - "raw": { - "question": "Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace", - "chosen": "Routed relay ACK to Lead because broker alias is not registered in the current Relaycast workspace", - "alternatives": [], - "reasoning": "Direct DM to broker failed with agent-not-found; Lead is the only live coordinating agent and the skill protocol says to report status to your lead" - }, - "significance": "high" - }, - { - "ts": 1773957195267, - "type": "reflection", - "content": "Starting Codex subagent communication investigation and aligning it with existing Claude/Gemini plugin patterns before implementation", - "raw": { - "focalPoints": [ - "codex-subagents", - "hooks", - "skills", - "relaycast" - ], - "confidence": 0.73 - }, - "significance": "high", - "tags": [ - "focal:codex-subagents", - "focal:hooks", - "focal:skills", - "focal:relaycast", - "confidence:0.73" - ] - }, - { - "ts": 1773957457056, - "type": "decision", - "content": "Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication: Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication", - "raw": { - "question": "Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication", - "chosen": "Favor Codex-native skills plus custom agent TOML with Relaycast MCP over hooks-first integration for subagent-to-subagent communication", - "alternatives": [], - "reasoning": "Official Codex docs support project skills in .agents/skills and custom agents in .codex/agents with mcp_servers and skills.config today. Hooks landed in PR #13276 but remain experimental, limited to SessionStart/Stop, and are not the best primary integration point for peer messaging." - }, - "significance": "high" - }, - { - "ts": 1774037318747, - "type": "decision", - "content": "Add canonical metadata to legacy OpenClaw redirect routes: Add canonical metadata to legacy OpenClaw redirect routes", - "raw": { - "question": "Add canonical metadata to legacy OpenClaw redirect routes", - "chosen": "Add canonical metadata to legacy OpenClaw redirect routes", - "alternatives": [], - "reasoning": "All user-facing web routes should emit a canonical URL at agentrelay.dev; the only missing route modules are the legacy OpenClaw redirect pages." - }, - "significance": "high" - }, - { - "ts": 1774037562266, - "type": "decision", - "content": "Make /openclaw/skill the primary hosted skill route: Make /openclaw/skill the primary hosted skill route", - "raw": { - "question": "Make /openclaw/skill the primary hosted skill route", - "chosen": "Make /openclaw/skill the primary hosted skill route", - "alternatives": [], - "reasoning": "The site copy, sitemap, and robots rules already use /openclaw/skill as the public URL. The current redirects invert that intent, so the content and invite pages should live under /openclaw/skill and /skill should redirect there as a legacy alias." - }, - "significance": "high" - }, - { - "ts": 1774103061345, - "type": "decision", - "content": "Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits: Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits", - "raw": { - "question": "Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits", - "chosen": "Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits", - "alternatives": [], - "reasoning": "The warnings are structural rather than behavioral, so helper extraction is the lowest-risk fix." - }, - "significance": "high" - }, - { - "ts": 1774103219705, - "type": "reflection", - "content": "Cleared the current web lint warnings by removing unused symbols, renaming ESM path helpers, and extracting animation helper functions without changing behavior.", - "raw": { - "focalPoints": [ - "lint", - "animation-components", - "esm-path-helpers" - ], - "adjustments": "Used helper extraction instead of logic changes to satisfy complexity and max-depth rules.", - "confidence": 0.87 - }, - "significance": "high", - "tags": [ - "focal:lint", - "focal:animation-components", - "focal:esm-path-helpers", - "confidence:0.87" - ] - } - ] - } - ], - "commits": [], - "filesChanged": [], - "projectId": "/Users/khaliqgant/Projects/relay", - "tags": [], - "_trace": { - "startRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af", - "endRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af" - } -} \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json new file mode 100644 index 000000000..f5bd1a782 --- /dev/null +++ b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json @@ -0,0 +1,117 @@ +{ + "id": "traj_1qnnaojcl0w6", + "version": 1, + "task": { + "title": "PTY output streaming: rate-limited buffering, stream filter, and Devin review fixes", + "source": { + "system": "plain", + "id": "390" + } + }, + "status": "completed", + "startedAt": "2026-03-25T09:28:08.884Z", + "agents": [ + { + "name": "default", + "role": "lead", + "joinedAt": "2026-03-25T09:28:14.608Z" + } + ], + "chapters": [ + { + "id": "chap_syakvczpwygq", + "title": "Work", + "agentName": "default", + "startedAt": "2026-03-25T09:28:14.608Z", + "events": [ + { + "ts": 1774430894610, + "type": "decision", + "content": "Added rate-limited buffering to worker_stream emissions in Rust PTY worker: Added rate-limited buffering to worker_stream emissions in Rust PTY worker", + "raw": { + "question": "Added rate-limited buffering to worker_stream emissions in Rust PTY worker", + "chosen": "Added rate-limited buffering to worker_stream emissions in Rust PTY worker", + "alternatives": [], + "reasoning": "Raw per-chunk worker_stream frames caused excessive frame noise. Buffer up to 4KB or 100ms before flushing to reduce overhead while maintaining low latency." + }, + "significance": "high" + }, + { + "ts": 1774430900072, + "type": "decision", + "content": "Added stream filter option to SDK onOutput: Added stream filter option to SDK onOutput", + "raw": { + "question": "Added stream filter option to SDK onOutput", + "chosen": "Added stream filter option to SDK onOutput", + "alternatives": [], + "reasoning": "Callers needed to filter by stdout/stderr without manually checking every event. Added optional { stream: 'stdout' } parameter to onOutput() so listeners only fire for matching streams." + }, + "significance": "high" + }, + { + "ts": 1774430907061, + "type": "decision", + "content": "Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication: Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication", + "raw": { + "question": "Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication", + "chosen": "Extracted flush_stream_buffer\\! macro to eliminate 5x code duplication", + "alternatives": [], + "reasoning": "Devin review identified that the identical buffer flush pattern was copy-pasted at 5 locations, causing bugs where fixes applied to one site didn't propagate. Prior commits had patched PTY-close and watchdog paths but missed the /exit path — proving the DRY violation was the root cause." + }, + "significance": "high" + }, + { + "ts": 1774430914035, + "type": "decision", + "content": "Added explicit mode option to onOutput to bypass toString() heuristic: Added explicit mode option to onOutput to bypass toString() heuristic", + "raw": { + "question": "Added explicit mode option to onOutput to bypass toString() heuristic", + "chosen": "Added explicit mode option to onOutput to bypass toString() heuristic", + "alternatives": [], + "reasoning": "inferOutputMode used callback.toString() to detect structured vs chunk mode, which silently breaks with minifiers — defaulting to chunk mode. Combined with the new stream filter, a developer could think they're filtering structured events but receive unfiltered raw strings. Added explicit { mode: 'structured' } option as escape hatch." + }, + "significance": "high" + }, + { + "ts": 1774430922596, + "type": "decision", + "content": "Inlined post-loop flush instead of using macro to avoid CI unused-assignment error: Inlined post-loop flush instead of using macro to avoid CI unused-assignment error", + "raw": { + "question": "Inlined post-loop flush instead of using macro to avoid CI unused-assignment error", + "chosen": "Inlined post-loop flush instead of using macro to avoid CI unused-assignment error", + "alternatives": [], + "reasoning": "The flush_stream_buffer! macro always updates stream_buffer_last_flush, but after the main loop this timestamp is never read. CI treats warnings as errors (-D warnings), so the post-loop site inlines the flush without the timestamp update." + }, + "significance": "high" + }, + { + "ts": 1774430928064, + "type": "reflection", + "content": "All Devin review issues addressed. Three exit paths (PTY-close, watchdog, /exit) now consistently flush stream_buffer before agent_exit. DRY macro prevents future regressions. SDK stream filter and explicit mode option complete the feature.", + "raw": { + "confidence": 0.9 + }, + "significance": "high", + "tags": [ + "confidence:0.9" + ] + } + ], + "endedAt": "2026-03-25T09:28:53.882Z" + } + ], + "commits": [], + "filesChanged": [], + "projectId": "/Users/khaliqgant/Projects/AgentWorkforce/relay", + "tags": [], + "_trace": { + "startRef": "80320ff17e665a87bb8d1b68955a3cc5821be6b1", + "endRef": "80320ff17e665a87bb8d1b68955a3cc5821be6b1" + }, + "completedAt": "2026-03-25T09:28:53.882Z", + "retrospective": { + "summary": "Implemented PTY output streaming improvements: rate-limited buffering in Rust (4KB/100ms), SDK onOutput stream filter and explicit mode option, flush_stream_buffer macro to eliminate duplication, and consistent buffer flushing across all 3 exit paths (/exit, PTY-close, watchdog). Addressed all Devin review findings.", + "approach": "Standard approach", + "confidence": 0.9 + } +} \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.md b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.md new file mode 100644 index 000000000..365cc1bc4 --- /dev/null +++ b/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.md @@ -0,0 +1,53 @@ +# Trajectory: PTY output streaming: rate-limited buffering, stream filter, and Devin review fixes + +> **Status:** ✅ Completed +> **Task:** 390 +> **Confidence:** 90% +> **Started:** March 25, 2026 at 10:28 AM +> **Completed:** March 25, 2026 at 10:28 AM + +--- + +## Summary + +Implemented PTY output streaming improvements: rate-limited buffering in Rust (4KB/100ms), SDK onOutput stream filter and explicit mode option, flush_stream_buffer macro to eliminate duplication, and consistent buffer flushing across all 3 exit paths (/exit, PTY-close, watchdog). Addressed all Devin review findings. + +**Approach:** Standard approach + +--- + +## Key Decisions + +### Added rate-limited buffering to worker_stream emissions in Rust PTY worker +- **Chose:** Added rate-limited buffering to worker_stream emissions in Rust PTY worker +- **Reasoning:** Raw per-chunk worker_stream frames caused excessive frame noise. Buffer up to 4KB or 100ms before flushing to reduce overhead while maintaining low latency. + +### Added stream filter option to SDK onOutput +- **Chose:** Added stream filter option to SDK onOutput +- **Reasoning:** Callers needed to filter by stdout/stderr without manually checking every event. Added optional { stream: 'stdout' } parameter to onOutput() so listeners only fire for matching streams. + +### Extracted flush_stream_buffer\! macro to eliminate 5x code duplication +- **Chose:** Extracted flush_stream_buffer\! macro to eliminate 5x code duplication +- **Reasoning:** Devin review identified that the identical buffer flush pattern was copy-pasted at 5 locations, causing bugs where fixes applied to one site didn't propagate. Prior commits had patched PTY-close and watchdog paths but missed the /exit path — proving the DRY violation was the root cause. + +### Added explicit mode option to onOutput to bypass toString() heuristic +- **Chose:** Added explicit mode option to onOutput to bypass toString() heuristic +- **Reasoning:** inferOutputMode used callback.toString() to detect structured vs chunk mode, which silently breaks with minifiers — defaulting to chunk mode. Combined with the new stream filter, a developer could think they're filtering structured events but receive unfiltered raw strings. Added explicit { mode: 'structured' } option as escape hatch. + +### Inlined post-loop flush instead of using macro to avoid CI unused-assignment error +- **Chose:** Inlined post-loop flush instead of using macro to avoid CI unused-assignment error +- **Reasoning:** The flush_stream_buffer! macro always updates stream_buffer_last_flush, but after the main loop this timestamp is never read. CI treats warnings as errors (-D warnings), so the post-loop site inlines the flush without the timestamp update. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Added rate-limited buffering to worker_stream emissions in Rust PTY worker: Added rate-limited buffering to worker_stream emissions in Rust PTY worker +- Added stream filter option to SDK onOutput: Added stream filter option to SDK onOutput +- Extracted flush_stream_buffer\! macro to eliminate 5x code duplication: Extracted flush_stream_buffer\! macro to eliminate 5x code duplication +- Added explicit mode option to onOutput to bypass toString() heuristic: Added explicit mode option to onOutput to bypass toString() heuristic +- Inlined post-loop flush instead of using macro to avoid CI unused-assignment error: Inlined post-loop flush instead of using macro to avoid CI unused-assignment error +- All Devin review issues addressed. Three exit paths (PTY-close, watchdog, /exit) now consistently flush stream_buffer before agent_exit. DRY macro prevents future regressions. SDK stream filter and explicit mode option complete the feature. diff --git a/.trajectories/active/traj_ni1xbsaa03bv.json b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.json similarity index 84% rename from .trajectories/active/traj_ni1xbsaa03bv.json rename to .trajectories/completed/2026-03/traj_ni1xbsaa03bv.json index 2270aeaf3..bec7f0253 100644 --- a/.trajectories/active/traj_ni1xbsaa03bv.json +++ b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.json @@ -4,7 +4,7 @@ "task": { "title": "Add mute/unmute channel methods to sdk-typescript AgentClient" }, - "status": "active", + "status": "abandoned", "startedAt": "2026-03-23T20:21:54.726Z", "agents": [ { @@ -31,8 +31,15 @@ "reasoning": "Worker can read ../relaycast but apply_patch is limited to the writable relay workspace, so I produced a ready-to-apply patch artifact instead." }, "significance": "high" + }, + { + "ts": 1774430878870, + "type": "note", + "content": "Abandoned: Switching to PTY output streaming work", + "significance": "high" } - ] + ], + "endedAt": "2026-03-25T09:27:58.870Z" } ], "commits": [], @@ -42,5 +49,6 @@ "_trace": { "startRef": "eb6ed1f75a433eab11500574d1fa9897695e5d90", "endRef": "eb6ed1f75a433eab11500574d1fa9897695e5d90" - } + }, + "completedAt": "2026-03-25T09:27:58.870Z" } \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.md b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.md new file mode 100644 index 000000000..ceb65ea12 --- /dev/null +++ b/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.md @@ -0,0 +1,23 @@ +# Trajectory: Add mute/unmute channel methods to sdk-typescript AgentClient + +> **Status:** ❌ Abandoned +> **Started:** March 23, 2026 at 09:21 PM +> **Completed:** March 25, 2026 at 10:27 AM + +--- + +## Key Decisions + +### Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox +- **Chose:** Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox +- **Reasoning:** Worker can read ../relaycast but apply_patch is limited to the writable relay workspace, so I produced a ready-to-apply patch artifact instead. + +--- + +## Chapters + +### 1. Work +*Agent: default* + +- Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox: Prepared sdk-typescript AgentClient mute/unmute patch but direct write to sibling relaycast checkout is blocked by sandbox +- Abandoned: Switching to PTY output streaming work diff --git a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json index f39364270..cddf956bc 100644 --- a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json +++ b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json @@ -4,7 +4,7 @@ "task": { "title": "Code review for assigned workflow changes" }, - "status": "completed", + "status": "abandoned", "startedAt": "2026-03-12T08:38:19.960Z", "agents": [ { @@ -122,9 +122,15 @@ "focal:esm-path-helpers", "confidence:0.87" ] + }, + { + "ts": 1774430888625, + "type": "note", + "content": "Abandoned: Stale trajectory", + "significance": "high" } ], - "endedAt": "2026-03-23T15:08:33.166Z" + "endedAt": "2026-03-25T09:28:08.625Z" } ], "commits": [], @@ -135,10 +141,5 @@ "startRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af", "endRef": "4ce7ccf5e297e46ededaf74c2a997cfe0d2f88af" }, - "completedAt": "2026-03-23T15:08:33.166Z", - "retrospective": { - "summary": "Added channel management protocol variants and SDK client methods", - "approach": "Standard approach", - "confidence": 0.93 - } + "completedAt": "2026-03-25T09:28:08.625Z" } \ No newline at end of file diff --git a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md index cd812d720..3fcac81b9 100644 --- a/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md +++ b/.trajectories/completed/2026-03/traj_u6bndffr0b8g.md @@ -1,17 +1,8 @@ # Trajectory: Code review for assigned workflow changes -> **Status:** ✅ Completed -> **Confidence:** 93% +> **Status:** ❌ Abandoned > **Started:** March 12, 2026 at 09:38 AM -> **Completed:** March 23, 2026 at 04:08 PM - ---- - -## Summary - -Added channel management protocol variants and SDK client methods - -**Approach:** Standard approach +> **Completed:** March 25, 2026 at 10:28 AM --- @@ -51,3 +42,4 @@ Added channel management protocol variants and SDK client methods - Make /openclaw/skill the primary hosted skill route: Make /openclaw/skill the primary hosted skill route - Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits: Keep animation behavior intact while extracting helper functions to satisfy ESLint complexity limits - Cleared the current web lint warnings by removing unused symbols, renaming ESM path helpers, and extracting animation helper functions without changing behavior. +- Abandoned: Stale trajectory diff --git a/.trajectories/index.json b/.trajectories/index.json index ba058b2b8..519d85fbf 100644 --- a/.trajectories/index.json +++ b/.trajectories/index.json @@ -1,6 +1,6 @@ { "version": 1, - "lastUpdated": "2026-03-21T14:26:59.706Z", + "lastUpdated": "2026-03-25T09:28:53.988Z", "trajectories": { "traj_1b1dj40sl6jl": { "title": "Revert aggressive retry logic in relay-pty-orchestrator", @@ -838,9 +838,24 @@ }, "traj_u6bndffr0b8g": { "title": "Code review for assigned workflow changes", - "status": "active", + "status": "abandoned", "startedAt": "2026-03-12T08:38:19.960Z", - "path": "/Users/will/Projects/relay/.trajectories/active/traj_u6bndffr0b8g.json" + "completedAt": "2026-03-25T09:28:08.625Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-03/traj_u6bndffr0b8g.json" + }, + "traj_ni1xbsaa03bv": { + "title": "Add mute/unmute channel methods to sdk-typescript AgentClient", + "status": "abandoned", + "startedAt": "2026-03-23T20:21:54.726Z", + "completedAt": "2026-03-25T09:27:58.870Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-03/traj_ni1xbsaa03bv.json" + }, + "traj_1qnnaojcl0w6": { + "title": "PTY output streaming: rate-limited buffering, stream filter, and Devin review fixes", + "status": "completed", + "startedAt": "2026-03-25T09:28:08.884Z", + "completedAt": "2026-03-25T09:28:53.882Z", + "path": "/Users/khaliqgant/Projects/AgentWorkforce/relay/.trajectories/completed/2026-03/traj_1qnnaojcl0w6.json" } } } \ No newline at end of file