From a72d53c2e054a98d82a867505d4ef97329a40cd6 Mon Sep 17 00:00:00 2001 From: Noah Horton Date: Mon, 9 Mar 2026 16:16:04 -0400 Subject: [PATCH 1/5] Add file-based status reporting for external consumers Implements a stable external interface (.deepwork/tmp/status/v1/) that allows UIs, dashboards, and monitoring tools to read job and workflow session state without going through the MCP protocol. Key changes: - StatusWriter writes job_manifest.yml and sessions/.yml - WorkflowSession gains workflow_instance_id and step_history tracking - Completed/aborted workflows preserved in completed_workflows list - Sub-workflow instance IDs tracked on parent steps - Status writes are fire-and-forget (failures logged, never fail tools) Co-Authored-By: Claude Opus 4.6 --- .../jobs/JOBS-REQ-010-status-reporting.md | 114 +++++ src/deepwork/jobs/mcp/schemas.py | 24 ++ src/deepwork/jobs/mcp/server.py | 10 + src/deepwork/jobs/mcp/state.py | 163 ++++++- src/deepwork/jobs/mcp/status.py | 208 +++++++++ src/deepwork/jobs/mcp/tools.py | 64 ++- tests/unit/jobs/mcp/test_state.py | 380 ++++++++++++++++ tests/unit/jobs/mcp/test_status.py | 404 ++++++++++++++++++ tests/unit/jobs/mcp/test_tools.py | 115 +++++ 9 files changed, 1469 insertions(+), 13 deletions(-) create mode 100644 specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md create mode 100644 src/deepwork/jobs/mcp/status.py create mode 100644 tests/unit/jobs/mcp/test_status.py diff --git a/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md b/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md new file mode 100644 index 00000000..75f05094 --- /dev/null +++ b/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md @@ -0,0 +1,114 @@ +# JOBS-REQ-010: Status Reporting + +## Overview + +DeepWork provides a file-based external interface for reporting the current status of jobs and workflow sessions. This allows external tools (UIs, dashboards, monitoring) to read the current state without going through the MCP protocol. Status files are a **stable external interface** — the file format MUST NOT change without versioning. + +## Requirements + +### JOBS-REQ-010.1: Status Directory Structure + +1. Status files MUST be written to `.deepwork/tmp/status/v1/` under the project root. +2. The job manifest MUST be written to `.deepwork/tmp/status/v1/job_manifest.yml`. +3. Per-session status files MUST be written to `.deepwork/tmp/status/v1/sessions/.yml`. +4. The status directory structure MUST be versioned (currently `v1`) to allow future format changes. + +### JOBS-REQ-010.2: Job Manifest + +1. `job_manifest.yml` MUST contain a `jobs` array of all available job definitions. +2. Each job entry MUST include `name`, `display_name`, `summary`, and `workflows`. +3. Each workflow entry MUST include `name`, `display_name`, `summary`, and `steps`. +4. Each step entry MUST include `name` and `display_name`. +5. Jobs MUST be sorted alphabetically by `name`. +6. Workflows within each job MUST be sorted alphabetically by `name`. + +### JOBS-REQ-010.3: Job Manifest Write Triggers + +1. The manifest MUST be written at MCP server startup. +2. The manifest MUST be written when `get_workflows` is called. + +### JOBS-REQ-010.4: Display Name Derivation + +1. `display_name` MUST be derived from the API name by replacing underscores and hyphens with spaces, then title-casing the result. +2. An empty API name MUST produce an empty display name. + +### JOBS-REQ-010.5: Session Status Format + +1. Each session status file MUST include `session_id`, `last_updated_at`, `active_workflow`, and `workflows`. +2. `active_workflow` MUST be the `workflow_instance_id` of the top-of-stack workflow on the main stack, or `null` if no active workflow. +3. `last_updated_at` MUST be an ISO 8601 timestamp in UTC. +4. `workflows` MUST be an array of all workflow instances (active, completed, and aborted) for the session. +5. Each workflow entry MUST include `workflow_instance_id`, `job_name`, `status`, `workflow` (definition snapshot), `agent_id`, and `steps` (ordered history). + +### JOBS-REQ-010.6: Session Status Write Triggers + +1. Session status MUST be written when `start_workflow` is called. +2. Session status MUST be written when `finished_step` is called (for all result statuses: needs_work, next_step, workflow_complete). +3. Session status MUST be written when `go_to_step` is called. +4. Session status MUST be written when `abort_workflow` is called. + +### JOBS-REQ-010.7: Workflow Instance ID + +1. Each WorkflowSession MUST have a `workflow_instance_id` field. +2. `workflow_instance_id` MUST be generated as `uuid4().hex` (32 hex characters). +3. `workflow_instance_id` MUST be generated via a default factory so existing state files without the field are backward-compatible. +4. `workflow_instance_id` MUST be unique across all workflow instances. + +### JOBS-REQ-010.8: Step History + +1. Each WorkflowSession MUST maintain a `step_history` list of `StepHistoryEntry` objects. +2. `start_step()` MUST append a new `StepHistoryEntry` with `step_id` and `started_at`. +3. `complete_step()` MUST update the last matching `StepHistoryEntry`'s `finished_at`. +4. `go_to_step()` followed by `start_step()` MUST create a new history entry, resulting in the same step appearing multiple times in history. +5. Step history entries MUST NOT be cleared by `go_to_step()` (only `step_progress` is cleared). + +### JOBS-REQ-010.9: Sub-Workflow Instance Tracking + +1. `StepProgress` MUST have a `sub_workflow_instance_ids` list field. +2. `StepHistoryEntry` MUST have a `sub_workflow_instance_ids` list field. +3. When a nested workflow is started (parent exists on same stack), the child's `workflow_instance_id` MUST be appended to the parent's current step's `sub_workflow_instance_ids` in both `step_progress` and `step_history`. +4. When a cross-agent sub-workflow is started (agent_id set, parent on main stack), the child's `workflow_instance_id` MUST also be recorded on the main stack parent's current step. + +### JOBS-REQ-010.10: Completed/Aborted Workflow Preservation + +1. State files MUST support a `completed_workflows` array alongside `workflow_stack`. +2. `complete_workflow()` MUST move the completed session from `workflow_stack` to `completed_workflows`. +3. `abort_workflow()` MUST move the aborted session from `workflow_stack` to `completed_workflows`. +4. `_write_stack()` MUST preserve existing `completed_workflows` when the parameter is not explicitly provided. +5. Multiple completed/aborted workflows MUST accumulate in the `completed_workflows` array. + +### JOBS-REQ-010.11: Session Data Retrieval + +1. `get_all_session_data()` MUST scan the session directory for `state.json` and `agent_*.json` files. +2. `get_all_session_data()` MUST return a dict mapping agent_id (None for main) to (active_stack, completed_workflows) tuples. +3. `get_all_session_data()` MUST return an empty dict for non-existent sessions. + +### JOBS-REQ-010.12: Fire-and-Forget Semantics + +1. Status writing failures MUST be logged as warnings. +2. Status writing failures MUST NOT cause the MCP tool call to fail. +3. Status writing MUST NOT block or delay the tool response. + +### JOBS-REQ-010.13: External Interface Stability + +1. The file format of `job_manifest.yml` and `sessions/.yml` is a stable external contract. +2. Field additions are permitted (backward-compatible). +3. Field removals, renames, or semantic changes MUST NOT be made without incrementing the version path (e.g., `v2/`). + +## Test Coverage + +| Requirement | Test File | Test Name | +|-------------|-----------|-----------| +| JOBS-REQ-010.1 | test_status.py | TestWriteManifest::test_creates_manifest_file | +| JOBS-REQ-010.2 | test_status.py | TestWriteManifest::test_manifest_structure | +| JOBS-REQ-010.3 | test_tools.py | TestStatusWriterIntegration::test_get_workflows_writes_manifest | +| JOBS-REQ-010.4 | test_status.py | TestDeriveDisplayName::* | +| JOBS-REQ-010.5 | test_status.py | TestWriteSessionStatus::test_session_status_structure | +| JOBS-REQ-010.6 | test_tools.py | TestStatusWriterIntegration::test_start_workflow_writes_session_status, test_finished_step_writes_session_status, test_abort_workflow_writes_session_status | +| JOBS-REQ-010.7 | test_state.py | TestWorkflowInstanceId::* | +| JOBS-REQ-010.8 | test_state.py | TestStepHistory::* | +| JOBS-REQ-010.9 | test_state.py | TestSubWorkflowInstanceIds::* | +| JOBS-REQ-010.10 | test_state.py | TestCompletedWorkflows::* | +| JOBS-REQ-010.11 | test_state.py | TestGetAllSessionData::* | +| JOBS-REQ-010.12 | test_tools.py | TestStatusWriterIntegration::test_status_writer_failure_does_not_break_tool | +| JOBS-REQ-010.13 | (Manual review — structural contract) | diff --git a/src/deepwork/jobs/mcp/schemas.py b/src/deepwork/jobs/mcp/schemas.py index c2941b7e..b8beb424 100644 --- a/src/deepwork/jobs/mcp/schemas.py +++ b/src/deepwork/jobs/mcp/schemas.py @@ -8,6 +8,7 @@ from enum import StrEnum from typing import Any +from uuid import uuid4 from pydantic import BaseModel, Field @@ -375,6 +376,22 @@ class StepProgress(BaseModel): ) notes: str | None = Field(default=None, description="Notes from agent") quality_attempts: int = Field(default=0, description="Number of quality gate attempts") + sub_workflow_instance_ids: list[str] = Field( + default_factory=list, + description="Instance IDs of sub-workflows started from this step", + ) + + +class StepHistoryEntry(BaseModel): + """An entry in the step execution history.""" + + step_id: str = Field(description="Step identifier") + started_at: str | None = Field(default=None, description="ISO timestamp when started") + finished_at: str | None = Field(default=None, description="ISO timestamp when finished") + sub_workflow_instance_ids: list[str] = Field( + default_factory=list, + description="Instance IDs of sub-workflows started during this step execution", + ) class WorkflowSession(BaseModel): @@ -386,6 +403,10 @@ class WorkflowSession(BaseModel): "This is the same session ID the agent received at startup." ) ) + workflow_instance_id: str = Field( + default_factory=lambda: uuid4().hex, + description="Unique identifier for this workflow instance", + ) job_name: str = Field(description="Name of the job") workflow_name: str = Field(description="Name of the workflow") goal: str = Field(description="User's goal for this workflow") @@ -396,6 +417,9 @@ class WorkflowSession(BaseModel): step_progress: dict[str, StepProgress] = Field( default_factory=dict, description="Progress for each step" ) + step_history: list[StepHistoryEntry] = Field( + default_factory=list, description="Ordered history of step executions" + ) started_at: str = Field(description="ISO timestamp when session started") completed_at: str | None = Field(default=None, description="ISO timestamp when completed") status: str = Field(default="active", description="Session status: active, completed, aborted") diff --git a/src/deepwork/jobs/mcp/server.py b/src/deepwork/jobs/mcp/server.py index 54144c41..7743efa0 100644 --- a/src/deepwork/jobs/mcp/server.py +++ b/src/deepwork/jobs/mcp/server.py @@ -29,6 +29,7 @@ StartWorkflowInput, ) from deepwork.jobs.mcp.state import StateManager +from deepwork.jobs.mcp.status import StatusWriter from deepwork.jobs.mcp.tools import WorkflowTools # Configure logging @@ -97,14 +98,23 @@ def create_server( # Self-review mode: no CLI, always reference files by path (0 inline) quality_gate = QualityGate(cli=None, max_inline_files=0) + status_writer = StatusWriter(project_path) + tools = WorkflowTools( project_root=project_path, state_manager=state_manager, quality_gate=quality_gate, max_quality_attempts=quality_gate_max_attempts, external_runner=external_runner, + status_writer=status_writer, ) + # Write initial manifest at startup + try: + tools._write_manifest() + except Exception: + logger.warning("Failed to write initial job manifest", exc_info=True) + # Create MCP server mcp = FastMCP( name="deepwork", diff --git a/src/deepwork/jobs/mcp/state.py b/src/deepwork/jobs/mcp/state.py index 7709e251..9685cee2 100644 --- a/src/deepwork/jobs/mcp/state.py +++ b/src/deepwork/jobs/mcp/state.py @@ -21,10 +21,16 @@ import tempfile from datetime import UTC, datetime from pathlib import Path +from typing import Any import aiofiles -from deepwork.jobs.mcp.schemas import StackEntry, StepProgress, WorkflowSession +from deepwork.jobs.mcp.schemas import ( + StackEntry, + StepHistoryEntry, + StepProgress, + WorkflowSession, +) class StateError(Exception): @@ -102,11 +108,39 @@ async def _read_stack( stack_data = data.get("workflow_stack", []) return [WorkflowSession.from_dict(entry) for entry in stack_data] + async def _read_completed_workflows( + self, session_id: str, agent_id: str | None = None + ) -> list[WorkflowSession]: + """Read completed/aborted workflows from disk. + + Args: + session_id: Claude Code session ID + agent_id: Optional agent ID for sub-agent scoped state + + Returns: + List of completed/aborted WorkflowSession objects + """ + state_file = self._state_file(session_id, agent_id) + if not state_file.exists(): + return [] + + async with aiofiles.open(state_file, encoding="utf-8") as f: + content = await f.read() + + try: + data = json.loads(content) + except json.JSONDecodeError: + return [] + + completed_data = data.get("completed_workflows", []) + return [WorkflowSession.from_dict(entry) for entry in completed_data] + async def _write_stack( self, session_id: str, stack: list[WorkflowSession], agent_id: str | None = None, + completed_workflows: list[WorkflowSession] | None = None, ) -> None: """Write the workflow stack to disk. @@ -114,11 +148,26 @@ async def _write_stack( session_id: Claude Code session ID stack: List of WorkflowSession objects to persist agent_id: Optional agent ID for sub-agent scoped state + completed_workflows: Optional list of completed/aborted workflows to persist. + If None, preserves existing completed_workflows from the file. """ state_file = self._state_file(session_id, agent_id) state_file.parent.mkdir(parents=True, exist_ok=True) - data = {"workflow_stack": [s.to_dict() for s in stack]} + data: dict[str, Any] = {"workflow_stack": [s.to_dict() for s in stack]} + + if completed_workflows is not None: + data["completed_workflows"] = [s.to_dict() for s in completed_workflows] + else: + # Preserve existing completed_workflows if present + if state_file.exists(): + try: + existing = json.loads(state_file.read_text(encoding="utf-8")) + if "completed_workflows" in existing: + data["completed_workflows"] = existing["completed_workflows"] + except (json.JSONDecodeError, OSError): + pass + content = json.dumps(data, indent=2) # Write to a temp file then atomically rename to avoid partial reads @@ -173,6 +222,37 @@ async def create_session( status="active", ) + # If there's a parent workflow on the stack, record this sub-workflow's + # instance ID on the parent's current step + if stack: + parent = stack[-1] + parent_step_id = parent.current_step_id + # Update step_progress + if parent_step_id in parent.step_progress: + parent.step_progress[parent_step_id].sub_workflow_instance_ids.append( + session.workflow_instance_id + ) + # Update last step_history entry if it matches + if parent.step_history and parent.step_history[-1].step_id == parent_step_id: + parent.step_history[-1].sub_workflow_instance_ids.append( + session.workflow_instance_id + ) + elif agent_id: + # Cross-agent sub-workflow: also update main stack's parent + main_stack = await self._read_stack(session_id, agent_id=None) + if main_stack: + parent = main_stack[-1] + parent_step_id = parent.current_step_id + if parent_step_id in parent.step_progress: + parent.step_progress[parent_step_id].sub_workflow_instance_ids.append( + session.workflow_instance_id + ) + if parent.step_history and parent.step_history[-1].step_id == parent_step_id: + parent.step_history[-1].sub_workflow_instance_ids.append( + session.workflow_instance_id + ) + await self._write_stack(session_id, main_stack, agent_id=None) + stack.append(session) await self._write_stack(session_id, stack, agent_id) return session @@ -241,6 +321,11 @@ async def start_step(self, session_id: str, step_id: str, agent_id: str | None = else: session.step_progress[step_id].started_at = now + # Append to step history + session.step_history.append( + StepHistoryEntry(step_id=step_id, started_at=now) + ) + session.current_step_id = step_id await self._write_stack(session_id, stack, agent_id) @@ -285,6 +370,10 @@ async def complete_step( progress.outputs = outputs progress.notes = notes + # Update the last step_history entry's finished_at + if session.step_history and session.step_history[-1].step_id == step_id: + session.step_history[-1].finished_at = now + await self._write_stack(session_id, stack, agent_id) async def record_quality_attempt( @@ -360,6 +449,9 @@ async def go_to_step( ) -> None: """Navigate back to a prior step, clearing progress from that step onward. + Step history is preserved (not cleared) — the step will appear again + in history when start_step is called, showing the re-execution. + Args: session_id: Claude Code session ID step_id: Step ID to navigate to @@ -395,6 +487,9 @@ async def complete_workflow( ) -> WorkflowSession | None: """Mark the workflow as complete and remove from stack. + The completed session is preserved in the completed_workflows list + for status reporting. + Args: session_id: Claude Code session ID agent_id: Optional agent ID for sub-agent scoped state @@ -417,9 +512,11 @@ async def complete_workflow( session.completed_at = now session.status = "completed" - # Pop the completed session from the stack + # Move from stack to completed_workflows + completed = await self._read_completed_workflows(session_id, agent_id) + completed.append(session) stack.pop() - await self._write_stack(session_id, stack, agent_id) + await self._write_stack(session_id, stack, agent_id, completed_workflows=completed) return stack[-1] if stack else None @@ -428,6 +525,9 @@ async def abort_workflow( ) -> tuple[WorkflowSession, WorkflowSession | None]: """Abort a workflow and remove from stack. + The aborted session is preserved in the completed_workflows list + for status reporting. + Args: session_id: Claude Code session ID explanation: Reason for aborting the workflow @@ -452,9 +552,11 @@ async def abort_workflow( session.status = "aborted" session.abort_reason = explanation - # Pop the aborted session from the stack + # Move from stack to completed_workflows + completed = await self._read_completed_workflows(session_id, agent_id) + completed.append(session) stack.pop() - await self._write_stack(session_id, stack, agent_id) + await self._write_stack(session_id, stack, agent_id, completed_workflows=completed) new_active = stack[-1] if stack else None return session, new_active @@ -539,3 +641,52 @@ def get_stack_depth(self, session_id: str, agent_id: str | None = None) -> int: Number of active workflow sessions on the stack """ return len(self.get_stack(session_id, agent_id)) + + def get_all_session_data( + self, session_id: str + ) -> dict[str | None, tuple[list[WorkflowSession], list[WorkflowSession]]]: + """Return all stacks and completed workflows for a session. + + Scans the session directory for state.json and agent_*.json files. + + Args: + session_id: Claude Code session ID + + Returns: + Dict mapping agent_id (None for main) to + (active_stack, completed_workflows) tuples + """ + session_dir = self.sessions_dir / f"session-{session_id}" + result: dict[str | None, tuple[list[WorkflowSession], list[WorkflowSession]]] = {} + + if not session_dir.exists(): + return result + + for state_file in sorted(session_dir.iterdir()): + if not state_file.suffix == ".json": + continue + + try: + data = json.loads(state_file.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + continue + + stack = [ + WorkflowSession.from_dict(entry) + for entry in data.get("workflow_stack", []) + ] + completed = [ + WorkflowSession.from_dict(entry) + for entry in data.get("completed_workflows", []) + ] + + if state_file.name == "state.json": + agent_id = None + elif state_file.name.startswith("agent_") and state_file.name.endswith(".json"): + agent_id = state_file.name[len("agent_"):-len(".json")] + else: + continue + + result[agent_id] = (stack, completed) + + return result diff --git a/src/deepwork/jobs/mcp/status.py b/src/deepwork/jobs/mcp/status.py new file mode 100644 index 00000000..b338a5e6 --- /dev/null +++ b/src/deepwork/jobs/mcp/status.py @@ -0,0 +1,208 @@ +"""Status file writer for external consumers. + +Writes `.deepwork/tmp/status/v1/` files that external tools (UIs, dashboards, +monitoring) can read to understand the current state of jobs and workflow +sessions without going through the MCP protocol. + +IMPORTANT: The file format is a stable external interface. Changes to the +structure of job_manifest.yml or sessions/.yml require careful +consideration of backward compatibility. + +Status writing is fire-and-forget — failures are logged as warnings and +never fail a tool call. +""" + +from __future__ import annotations + +import logging +from datetime import UTC, datetime +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from deepwork.utils.yaml_utils import save_yaml + +if TYPE_CHECKING: + from deepwork.jobs.mcp.state import StateManager + from deepwork.jobs.parser import JobDefinition + +logger = logging.getLogger("deepwork.jobs.mcp.status") + + +def _derive_display_name(api_name: str) -> str: + """Derive a human-readable display name from an API name. + + Replaces underscores and hyphens with spaces and title-cases the result. + + Args: + api_name: The API/identifier name (e.g., "competitive_research") + + Returns: + Human-readable name (e.g., "Competitive Research") + """ + return api_name.replace("_", " ").replace("-", " ").title() + + +class StatusWriter: + """Writes status files for external consumers. + + Produces two types of files: + - job_manifest.yml: catalog of all available jobs/workflows/steps + - sessions/.yml: per-session workflow execution status + """ + + def __init__(self, project_root: Path): + self.status_dir = project_root / ".deepwork" / "tmp" / "status" / "v1" + self.manifest_path = self.status_dir / "job_manifest.yml" + self.sessions_dir = self.status_dir / "sessions" + + def write_manifest(self, jobs: list[JobDefinition]) -> None: + """Write job_manifest.yml with all available jobs, workflows, and steps. + + Jobs are sorted by name, workflows within each job are sorted by name. + + Args: + jobs: List of parsed job definitions + """ + sorted_jobs = sorted(jobs, key=lambda j: j.name) + manifest_jobs: list[dict[str, Any]] = [] + + for job in sorted_jobs: + sorted_workflows = sorted(job.workflows, key=lambda w: w.name) + wf_list: list[dict[str, Any]] = [] + for wf in sorted_workflows: + steps_list: list[dict[str, str]] = [] + for step_id in wf.steps: + steps_list.append({ + "name": step_id, + "display_name": _derive_display_name(step_id), + }) + wf_list.append({ + "name": wf.name, + "display_name": _derive_display_name(wf.name), + "summary": wf.summary, + "steps": steps_list, + }) + + manifest_jobs.append({ + "name": job.name, + "display_name": _derive_display_name(job.name), + "summary": job.summary, + "workflows": wf_list, + }) + + save_yaml(self.manifest_path, {"jobs": manifest_jobs}) + + def write_session_status( + self, + session_id: str, + state_manager: StateManager, + job_loader: Any, + ) -> None: + """Write sessions/.yml from current state. + + Reads all stacks (main + agent) for the session, loads job definitions + to include workflow metadata, and writes a unified status file. + + Args: + session_id: The session ID + state_manager: StateManager instance to read state from + job_loader: Callable that returns (list[JobDefinition], list[errors]) + """ + all_data = state_manager.get_all_session_data(session_id) + if not all_data: + return + + # Load job definitions for workflow metadata + jobs, _ = job_loader() + job_map: dict[str, JobDefinition] = {j.name: j for j in jobs} + + # Collect all workflow instances (active + completed) + workflows_output: list[dict[str, Any]] = [] + active_instance_id: str | None = None + + for agent_id, (active_stack, completed_wfs) in sorted( + all_data.items(), key=lambda x: (x[0] is not None, x[0] or "") + ): + # Active stack — top is the active workflow + for i, session in enumerate(active_stack): + wf_data = self._build_workflow_entry(session, agent_id, job_map) + workflows_output.append(wf_data) + # Top of main stack is the active workflow + if agent_id is None and i == len(active_stack) - 1: + active_instance_id = session.workflow_instance_id + + # Completed/aborted workflows + for session in completed_wfs: + wf_data = self._build_workflow_entry(session, agent_id, job_map) + workflows_output.append(wf_data) + + now = datetime.now(UTC).isoformat() + status_data: dict[str, Any] = { + "session_id": session_id, + "last_updated_at": now, + "active_workflow": active_instance_id, + "workflows": workflows_output, + } + + session_file = self.sessions_dir / f"{session_id}.yml" + save_yaml(session_file, status_data) + + def _build_workflow_entry( + self, + session: Any, + agent_id: str | None, + job_map: dict[str, Any], + ) -> dict[str, Any]: + """Build a workflow entry dict for the session status file. + + Args: + session: WorkflowSession instance + agent_id: Agent ID (None for main) + job_map: Map of job names to JobDefinition instances + + Returns: + Dict representing the workflow entry + """ + # Build workflow definition snapshot + wf_def: dict[str, Any] = { + "name": session.workflow_name, + "display_name": _derive_display_name(session.workflow_name), + "summary": "", + } + + # Enrich with job definition data if available + job = job_map.get(session.job_name) + if job: + for wf in job.workflows: + if wf.name == session.workflow_name: + wf_def["summary"] = wf.summary + wf_def["steps"] = [ + { + "name": step_id, + "display_name": _derive_display_name(step_id), + } + for step_id in wf.steps + ] + break + + # Build ordered step history + steps_output: list[dict[str, Any]] = [] + for entry in session.step_history: + step_entry: dict[str, Any] = { + "step_name": entry.step_id, + "started_at": entry.started_at, + "finished_at": entry.finished_at, + "sub_workflow_instance_ids": entry.sub_workflow_instance_ids, + } + steps_output.append(step_entry) + + result: dict[str, Any] = { + "workflow_instance_id": session.workflow_instance_id, + "job_name": session.job_name, + "status": session.status, + "workflow": wf_def, + "agent_id": agent_id, + "steps": steps_output, + } + + return result diff --git a/src/deepwork/jobs/mcp/tools.py b/src/deepwork/jobs/mcp/tools.py index 795edccf..000a41c3 100644 --- a/src/deepwork/jobs/mcp/tools.py +++ b/src/deepwork/jobs/mcp/tools.py @@ -50,6 +50,7 @@ if TYPE_CHECKING: from deepwork.jobs.mcp.quality_gate import QualityGate + from deepwork.jobs.mcp.status import StatusWriter class ToolError(Exception): @@ -68,6 +69,7 @@ def __init__( quality_gate: QualityGate | None = None, max_quality_attempts: int = 3, external_runner: str | None = None, + status_writer: StatusWriter | None = None, ): """Initialize workflow tools. @@ -78,12 +80,39 @@ def __init__( max_quality_attempts: Maximum attempts before failing quality gate external_runner: External runner for quality gate reviews. "claude" uses Claude CLI subprocess. None means agent self-review. + status_writer: Optional status writer for external status files. """ self.project_root = project_root self.state_manager = state_manager self.quality_gate = quality_gate self.max_quality_attempts = max_quality_attempts self.external_runner = external_runner + self.status_writer = status_writer + + def _write_session_status(self, session_id: str) -> None: + """Write session status file if status_writer is configured. + + Fire-and-forget: exceptions are logged as warnings and never propagated. + """ + if self.status_writer: + try: + self.status_writer.write_session_status( + session_id, self.state_manager, self._load_all_jobs + ) + except Exception: + logger.warning("Failed to write session status", exc_info=True) + + def _write_manifest(self) -> None: + """Write job manifest file if status_writer is configured. + + Fire-and-forget: exceptions are logged as warnings and never propagated. + """ + if self.status_writer: + try: + jobs, _ = self._load_all_jobs() + self.status_writer.write_manifest(jobs) + except Exception: + logger.warning("Failed to write job manifest", exc_info=True) def _load_all_jobs(self) -> tuple[list[JobDefinition], list[JobLoadError]]: """Load all job definitions from all configured job folders. @@ -361,6 +390,13 @@ def get_workflows(self) -> GetWorkflowsResponse: for e in load_errors ] + # Write manifest for external consumers + if self.status_writer: + try: + self.status_writer.write_manifest(jobs) + except Exception: + logger.warning("Failed to write job manifest", exc_info=True) + return GetWorkflowsResponse(jobs=job_infos, errors=error_infos) async def start_workflow(self, input_data: StartWorkflowInput) -> StartWorkflowResponse: @@ -409,12 +445,14 @@ async def start_workflow(self, input_data: StartWorkflowInput) -> StartWorkflowR # Get expected outputs step_outputs = self._build_expected_outputs(first_step.outputs) - return StartWorkflowResponse( + response = StartWorkflowResponse( begin_step=self._build_active_step_info( session.session_id, first_step_id, job, first_step, instructions, step_outputs ), stack=self.state_manager.get_stack(sid, aid), ) + self._write_session_status(sid) + return response async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResponse: """Report step completion and get next instructions. @@ -507,11 +545,13 @@ async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResp f"review outcome (e.g. 'Self-review passed: all criteria met')" ) - return FinishedStepResponse( + response = FinishedStepResponse( status=StepStatus.NEEDS_WORK, feedback=feedback, stack=self.state_manager.get_stack(sid, aid), ) + self._write_session_status(sid) + return response else: # External runner mode: use quality gate subprocess evaluation attempts = await self.state_manager.record_quality_attempt( @@ -537,12 +577,14 @@ async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResp # Return needs_work status combined_feedback = "; ".join(r.feedback for r in failed_reviews) - return FinishedStepResponse( + response = FinishedStepResponse( status=StepStatus.NEEDS_WORK, feedback=combined_feedback, failed_reviews=failed_reviews, stack=self.state_manager.get_stack(sid, aid), ) + self._write_session_status(sid) + return response # Mark step as completed await self.state_manager.complete_step( @@ -562,12 +604,14 @@ async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResp all_outputs = self.state_manager.get_all_outputs(sid, aid) await self.state_manager.complete_workflow(sid, aid) - return FinishedStepResponse( + response = FinishedStepResponse( status=StepStatus.WORKFLOW_COMPLETE, summary=f"Workflow '{workflow.name}' completed successfully!", all_outputs=all_outputs, stack=self.state_manager.get_stack(sid, aid), ) + self._write_session_status(sid) + return response # Get next step next_entry = workflow.step_entries[next_entry_index] @@ -591,13 +635,15 @@ async def finished_step(self, input_data: FinishedStepInput) -> FinishedStepResp # Add info about concurrent steps if this is a concurrent entry instructions = self._append_concurrent_info(instructions, next_entry) - return FinishedStepResponse( + response = FinishedStepResponse( status=StepStatus.NEXT_STEP, begin_step=self._build_active_step_info( sid, next_step_id, job, next_step, instructions, step_outputs ), stack=self.state_manager.get_stack(sid, aid), ) + self._write_session_status(sid) + return response async def abort_workflow(self, input_data: AbortWorkflowInput) -> AbortWorkflowResponse: """Abort the current workflow and return to the previous one. @@ -617,7 +663,7 @@ async def abort_workflow(self, input_data: AbortWorkflowInput) -> AbortWorkflowR sid, input_data.explanation, agent_id=aid ) - return AbortWorkflowResponse( + response = AbortWorkflowResponse( aborted_workflow=f"{aborted_session.job_name}/{aborted_session.workflow_name}", aborted_step=aborted_session.current_step_id, explanation=input_data.explanation, @@ -627,6 +673,8 @@ async def abort_workflow(self, input_data: AbortWorkflowInput) -> AbortWorkflowR ), resumed_step=new_active.current_step_id if new_active else None, ) + self._write_session_status(sid) + return response async def go_to_step(self, input_data: GoToStepInput) -> GoToStepResponse: """Navigate back to a prior step, clearing progress from that step onward. @@ -707,10 +755,12 @@ async def go_to_step(self, input_data: GoToStepInput) -> GoToStepResponse: # Add concurrent step info if applicable instructions = self._append_concurrent_info(instructions, target_entry) - return GoToStepResponse( + response = GoToStepResponse( begin_step=self._build_active_step_info( sid, nav_step_id, job, nav_step, instructions, step_outputs ), invalidated_steps=invalidate_step_ids, stack=self.state_manager.get_stack(sid, aid), ) + self._write_session_status(sid) + return response diff --git a/tests/unit/jobs/mcp/test_state.py b/tests/unit/jobs/mcp/test_state.py index ec7bc1d9..c7472060 100644 --- a/tests/unit/jobs/mcp/test_state.py +++ b/tests/unit/jobs/mcp/test_state.py @@ -1,5 +1,6 @@ """Tests for MCP state management.""" +import json from pathlib import Path import pytest @@ -797,3 +798,382 @@ async def test_get_stack_without_agent_returns_main_only( main_stack = state_manager.get_stack(SESSION_ID) assert len(main_stack) == 1 assert main_stack[0].workflow == "main_job/main" + + +class TestWorkflowInstanceId: + """Tests for workflow_instance_id generation.""" + + @pytest.fixture + def project_root(self, tmp_path: Path) -> Path: + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir() + (deepwork_dir / "tmp").mkdir() + return tmp_path + + @pytest.fixture + def state_manager(self, project_root: Path) -> StateManager: + return StateManager(project_root=project_root, platform="test") + + async def test_workflow_instance_id_generated(self, state_manager: StateManager) -> None: + """Each session gets a unique workflow_instance_id.""" + session = await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + assert session.workflow_instance_id + assert len(session.workflow_instance_id) == 32 # uuid4().hex + + async def test_workflow_instance_ids_unique(self, state_manager: StateManager) -> None: + """Two sessions get different instance IDs.""" + s1 = await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal 1", + first_step_id="step1", + ) + s2 = await state_manager.create_session( + session_id=SESSION_ID, + job_name="job2", + workflow_name="wf2", + goal="Goal 2", + first_step_id="stepA", + ) + assert s1.workflow_instance_id != s2.workflow_instance_id + + async def test_workflow_instance_id_persists( + self, state_manager: StateManager, project_root: Path + ) -> None: + """Instance ID is persisted to disk.""" + session = await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + loaded = state_manager.resolve_session(SESSION_ID) + assert loaded.workflow_instance_id == session.workflow_instance_id + + +class TestStepHistory: + """Tests for step_history tracking.""" + + @pytest.fixture + def project_root(self, tmp_path: Path) -> Path: + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir() + (deepwork_dir / "tmp").mkdir() + return tmp_path + + @pytest.fixture + def state_manager(self, project_root: Path) -> StateManager: + return StateManager(project_root=project_root, platform="test") + + async def test_start_step_appends_history(self, state_manager: StateManager) -> None: + """start_step appends to step_history.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + session = state_manager.resolve_session(SESSION_ID) + assert len(session.step_history) == 1 + assert session.step_history[0].step_id == "step1" + assert session.step_history[0].started_at is not None + + async def test_complete_step_sets_finished_at(self, state_manager: StateManager) -> None: + """complete_step updates the last history entry's finished_at.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + await state_manager.complete_step(SESSION_ID, "step1", {"out": "out.md"}) + + session = state_manager.resolve_session(SESSION_ID) + assert session.step_history[0].finished_at is not None + + async def test_go_to_step_creates_duplicate_history( + self, state_manager: StateManager + ) -> None: + """go_to_step + start_step creates a second entry for the same step.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + await state_manager.complete_step(SESSION_ID, "step1", {"out": "out.md"}) + + # Go back + await state_manager.go_to_step( + session_id=SESSION_ID, + step_id="step1", + entry_index=0, + invalidate_step_ids=["step1"], + ) + await state_manager.start_step(SESSION_ID, "step1") + + session = state_manager.resolve_session(SESSION_ID) + assert len(session.step_history) == 2 + assert all(h.step_id == "step1" for h in session.step_history) + + async def test_multi_step_history_ordering(self, state_manager: StateManager) -> None: + """Steps appear in history in execution order.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + await state_manager.complete_step(SESSION_ID, "step1", {"out": "out.md"}) + await state_manager.start_step(SESSION_ID, "step2") + + session = state_manager.resolve_session(SESSION_ID) + assert len(session.step_history) == 2 + assert session.step_history[0].step_id == "step1" + assert session.step_history[1].step_id == "step2" + + +class TestCompletedWorkflows: + """Tests for completed_workflows persistence.""" + + @pytest.fixture + def project_root(self, tmp_path: Path) -> Path: + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir() + (deepwork_dir / "tmp").mkdir() + return tmp_path + + @pytest.fixture + def state_manager(self, project_root: Path) -> StateManager: + return StateManager(project_root=project_root, platform="test") + + async def test_complete_workflow_preserves_in_completed( + self, state_manager: StateManager + ) -> None: + """Completed workflow is moved to completed_workflows list.""" + session = await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + instance_id = session.workflow_instance_id + + await state_manager.complete_workflow(SESSION_ID) + + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + assert len(data["workflow_stack"]) == 0 + assert len(data["completed_workflows"]) == 1 + assert data["completed_workflows"][0]["workflow_instance_id"] == instance_id + assert data["completed_workflows"][0]["status"] == "completed" + + async def test_abort_workflow_preserves_in_completed( + self, state_manager: StateManager + ) -> None: + """Aborted workflow is moved to completed_workflows list.""" + session = await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + instance_id = session.workflow_instance_id + + await state_manager.abort_workflow(SESSION_ID, "Cancelled") + + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + assert len(data["completed_workflows"]) == 1 + assert data["completed_workflows"][0]["workflow_instance_id"] == instance_id + assert data["completed_workflows"][0]["status"] == "aborted" + + async def test_multiple_completed_workflows(self, state_manager: StateManager) -> None: + """Multiple completed workflows accumulate.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal 1", + first_step_id="step1", + ) + await state_manager.complete_workflow(SESSION_ID) + + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job2", + workflow_name="wf2", + goal="Goal 2", + first_step_id="stepA", + ) + await state_manager.abort_workflow(SESSION_ID, "Done") + + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + assert len(data["completed_workflows"]) == 2 + + +class TestGetAllSessionData: + """Tests for get_all_session_data.""" + + @pytest.fixture + def project_root(self, tmp_path: Path) -> Path: + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir() + (deepwork_dir / "tmp").mkdir() + return tmp_path + + @pytest.fixture + def state_manager(self, project_root: Path) -> StateManager: + return StateManager(project_root=project_root, platform="test") + + async def test_returns_empty_for_missing_session(self, state_manager: StateManager) -> None: + result = state_manager.get_all_session_data("nonexistent") + assert result == {} + + async def test_returns_main_stack(self, state_manager: StateManager) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + + result = state_manager.get_all_session_data(SESSION_ID) + assert None in result + active_stack, completed = result[None] + assert len(active_stack) == 1 + assert len(completed) == 0 + + async def test_returns_agent_stacks(self, state_manager: StateManager) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="main_job", + workflow_name="main", + goal="Main", + first_step_id="step1", + ) + await state_manager.create_session( + session_id=SESSION_ID, + job_name="agent_job", + workflow_name="agent_wf", + goal="Agent", + first_step_id="a_step1", + agent_id=AGENT_ID, + ) + + result = state_manager.get_all_session_data(SESSION_ID) + assert None in result + assert AGENT_ID in result + active, _ = result[AGENT_ID] + assert len(active) == 1 + assert active[0].job_name == "agent_job" + + async def test_includes_completed_workflows(self, state_manager: StateManager) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + await state_manager.complete_workflow(SESSION_ID) + + result = state_manager.get_all_session_data(SESSION_ID) + active, completed = result[None] + assert len(active) == 0 + assert len(completed) == 1 + assert completed[0].status == "completed" + + +class TestSubWorkflowInstanceIds: + """Tests for sub_workflow_instance_ids tracking on parent steps.""" + + @pytest.fixture + def project_root(self, tmp_path: Path) -> Path: + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir() + (deepwork_dir / "tmp").mkdir() + return tmp_path + + @pytest.fixture + def state_manager(self, project_root: Path) -> StateManager: + return StateManager(project_root=project_root, platform="test") + + async def test_nested_workflow_records_instance_on_parent_step_progress( + self, state_manager: StateManager + ) -> None: + """Starting a nested workflow records the child's instance ID on parent's step_progress.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="parent_job", + workflow_name="parent_wf", + goal="Parent", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + child = await state_manager.create_session( + session_id=SESSION_ID, + job_name="child_job", + workflow_name="child_wf", + goal="Child", + first_step_id="child_step1", + ) + + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + parent_data = data["workflow_stack"][0] + assert ( + child.workflow_instance_id + in parent_data["step_progress"]["step1"]["sub_workflow_instance_ids"] + ) + + async def test_nested_workflow_records_instance_on_parent_step_history( + self, state_manager: StateManager + ) -> None: + """Starting a nested workflow records the child's instance ID on parent's step_history.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="parent_job", + workflow_name="parent_wf", + goal="Parent", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + child = await state_manager.create_session( + session_id=SESSION_ID, + job_name="child_job", + workflow_name="child_wf", + goal="Child", + first_step_id="child_step1", + ) + + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + parent_data = data["workflow_stack"][0] + assert ( + child.workflow_instance_id + in parent_data["step_history"][-1]["sub_workflow_instance_ids"] + ) diff --git a/tests/unit/jobs/mcp/test_status.py b/tests/unit/jobs/mcp/test_status.py new file mode 100644 index 00000000..a70e16b8 --- /dev/null +++ b/tests/unit/jobs/mcp/test_status.py @@ -0,0 +1,404 @@ +"""Tests for MCP status file writer.""" + +import json +from pathlib import Path + +import pytest +import yaml + +from deepwork.jobs.mcp.state import StateManager +from deepwork.jobs.mcp.status import StatusWriter, _derive_display_name +from deepwork.jobs.parser import JobDefinition, Workflow, WorkflowStepEntry + +SESSION_ID = "test-session-001" +AGENT_ID = "agent-abc" + + +@pytest.fixture +def project_root(tmp_path: Path) -> Path: + deepwork_dir = tmp_path / ".deepwork" + deepwork_dir.mkdir() + (deepwork_dir / "tmp").mkdir() + return tmp_path + + +@pytest.fixture +def status_writer(project_root: Path) -> StatusWriter: + return StatusWriter(project_root) + + +@pytest.fixture +def state_manager(project_root: Path) -> StateManager: + return StateManager(project_root=project_root, platform="test") + + +def _make_job( + name: str = "test_job", + summary: str = "A test job", + workflows: list[Workflow] | None = None, +) -> JobDefinition: + """Create a minimal JobDefinition for testing.""" + if workflows is None: + workflows = [ + Workflow( + name="main", + summary="Main workflow", + step_entries=[ + WorkflowStepEntry(step_ids=["step1"]), + WorkflowStepEntry(step_ids=["step2"]), + ], + ) + ] + return JobDefinition( + name=name, + version="1.0.0", + summary=summary, + common_job_info_provided_to_all_steps_at_runtime="", + steps=[], + job_dir=Path("/tmp/fake"), + workflows=workflows, + ) + + +class TestDeriveDisplayName: + def test_underscore(self) -> None: + assert _derive_display_name("competitive_research") == "Competitive Research" + + def test_hyphen(self) -> None: + assert _derive_display_name("ad-campaign") == "Ad Campaign" + + def test_mixed(self) -> None: + assert _derive_display_name("my_job-name") == "My Job Name" + + def test_single_word(self) -> None: + assert _derive_display_name("report") == "Report" + + def test_already_title(self) -> None: + assert _derive_display_name("Report") == "Report" + + def test_empty(self) -> None: + assert _derive_display_name("") == "" + + +class TestWriteManifest: + def test_creates_manifest_file(self, status_writer: StatusWriter) -> None: + jobs = [_make_job()] + status_writer.write_manifest(jobs) + assert status_writer.manifest_path.exists() + + def test_manifest_structure(self, status_writer: StatusWriter) -> None: + jobs = [_make_job()] + status_writer.write_manifest(jobs) + + data = yaml.safe_load(status_writer.manifest_path.read_text()) + assert "jobs" in data + assert len(data["jobs"]) == 1 + + job = data["jobs"][0] + assert job["name"] == "test_job" + assert job["display_name"] == "Test Job" + assert job["summary"] == "A test job" + assert len(job["workflows"]) == 1 + + wf = job["workflows"][0] + assert wf["name"] == "main" + assert wf["display_name"] == "Main" + assert wf["summary"] == "Main workflow" + assert len(wf["steps"]) == 2 + assert wf["steps"][0]["name"] == "step1" + assert wf["steps"][0]["display_name"] == "Step1" + + def test_manifest_sorted_by_name(self, status_writer: StatusWriter) -> None: + jobs = [ + _make_job(name="zebra_job", summary="Z job"), + _make_job(name="alpha_job", summary="A job"), + ] + status_writer.write_manifest(jobs) + + data = yaml.safe_load(status_writer.manifest_path.read_text()) + assert data["jobs"][0]["name"] == "alpha_job" + assert data["jobs"][1]["name"] == "zebra_job" + + def test_manifest_multiple_workflows_sorted(self, status_writer: StatusWriter) -> None: + wf_b = Workflow( + name="beta_wf", + summary="Beta", + step_entries=[WorkflowStepEntry(step_ids=["s1"])], + ) + wf_a = Workflow( + name="alpha_wf", + summary="Alpha", + step_entries=[WorkflowStepEntry(step_ids=["s2"])], + ) + jobs = [_make_job(workflows=[wf_b, wf_a])] + status_writer.write_manifest(jobs) + + data = yaml.safe_load(status_writer.manifest_path.read_text()) + wf_names = [w["name"] for w in data["jobs"][0]["workflows"]] + assert wf_names == ["alpha_wf", "beta_wf"] + + def test_empty_jobs_list(self, status_writer: StatusWriter) -> None: + status_writer.write_manifest([]) + data = yaml.safe_load(status_writer.manifest_path.read_text()) + assert data["jobs"] == [] + + +class TestWriteSessionStatus: + def _job_loader(self, jobs: list[JobDefinition] | None = None): + """Return a callable that returns the provided jobs.""" + if jobs is None: + jobs = [_make_job()] + + def loader(): + return jobs, [] + + return loader + + async def test_writes_session_file( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Test goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + session_file = status_writer.sessions_dir / f"{SESSION_ID}.yml" + assert session_file.exists() + + data = yaml.safe_load(session_file.read_text()) + assert data["session_id"] == SESSION_ID + assert data["active_workflow"] is not None + assert len(data["workflows"]) == 1 + + async def test_session_status_structure( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + session = await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Test goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load( + (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() + ) + + wf = data["workflows"][0] + assert wf["workflow_instance_id"] == session.workflow_instance_id + assert wf["job_name"] == "test_job" + assert wf["status"] == "active" + assert wf["agent_id"] is None + assert wf["workflow"]["name"] == "main" + assert wf["workflow"]["display_name"] == "Main" + assert len(wf["steps"]) == 1 + assert wf["steps"][0]["step_name"] == "step1" + assert wf["steps"][0]["started_at"] is not None + + async def test_completed_workflow_preserved( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Test goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + await state_manager.complete_step(SESSION_ID, "step1", {"out": "out.md"}) + await state_manager.complete_workflow(SESSION_ID) + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load( + (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() + ) + assert data["active_workflow"] is None + assert len(data["workflows"]) == 1 + assert data["workflows"][0]["status"] == "completed" + + async def test_aborted_workflow_preserved( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Test goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + await state_manager.abort_workflow(SESSION_ID, "Cancelled") + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load( + (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() + ) + assert data["workflows"][0]["status"] == "aborted" + + async def test_nested_workflows( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Goal 1", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Goal 2", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load( + (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() + ) + assert len(data["workflows"]) == 2 + # Active workflow should be the top of stack (last one pushed) + assert data["active_workflow"] == data["workflows"][1]["workflow_instance_id"] + + async def test_multi_agent_workflows( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Main goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Agent goal", + first_step_id="step1", + agent_id=AGENT_ID, + ) + await state_manager.start_step(SESSION_ID, "step1", agent_id=AGENT_ID) + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load( + (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() + ) + assert len(data["workflows"]) == 2 + agent_ids = [w["agent_id"] for w in data["workflows"]] + assert None in agent_ids + assert AGENT_ID in agent_ids + + async def test_step_history_ordering( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + """Step history shows duplicates when go_to_step is used.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Test", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + await state_manager.complete_step(SESSION_ID, "step1", {"out": "out.md"}) + + # Go back to step1 + await state_manager.go_to_step( + session_id=SESSION_ID, + step_id="step1", + entry_index=0, + invalidate_step_ids=["step1"], + ) + await state_manager.start_step(SESSION_ID, "step1") + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load( + (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() + ) + steps = data["workflows"][0]["steps"] + # step1 appears twice in history (original + re-execution) + step_names = [s["step_name"] for s in steps] + assert step_names == ["step1", "step1"] + + def test_no_session_data_is_noop( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + """write_session_status with non-existent session is a no-op.""" + status_writer.write_session_status("nonexistent", state_manager, self._job_loader()) + assert not status_writer.sessions_dir.exists() or not list( + status_writer.sessions_dir.iterdir() + ) + + +class TestSubWorkflowInstanceTracking: + """Tests for sub_workflow_instance_ids in status output.""" + + async def test_nested_workflow_records_sub_instance_id( + self, + status_writer: StatusWriter, + state_manager: StateManager, + project_root: Path, + ) -> None: + """When a nested workflow starts, its instance ID is recorded on the parent step.""" + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Parent goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + child = await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Child goal", + first_step_id="step1", + ) + + # Read parent from disk to verify sub_workflow_instance_ids was set + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + parent_data = data["workflow_stack"][0] + parent_step_progress = parent_data["step_progress"]["step1"] + assert child.workflow_instance_id in parent_step_progress["sub_workflow_instance_ids"] diff --git a/tests/unit/jobs/mcp/test_tools.py b/tests/unit/jobs/mcp/test_tools.py index eefcc04e..e32f41e1 100644 --- a/tests/unit/jobs/mcp/test_tools.py +++ b/tests/unit/jobs/mcp/test_tools.py @@ -13,6 +13,7 @@ StepStatus, ) from deepwork.jobs.mcp.state import StateError, StateManager +from deepwork.jobs.mcp.status import StatusWriter from deepwork.jobs.mcp.tools import ToolError, WorkflowTools SESSION_ID = "test-session" @@ -2368,3 +2369,117 @@ async def test_go_to_step_concurrent_entry(self, tmp_path: Path) -> None: assert "finalize" in response.invalidated_steps # setup should NOT be invalidated assert "setup" not in response.invalidated_steps + + +class TestStatusWriterIntegration: + """Tests that StatusWriter is called from WorkflowTools.""" + + @pytest.fixture + def tools_with_status( + self, project_root: Path, state_manager: StateManager + ) -> WorkflowTools: + status_writer = StatusWriter(project_root) + return WorkflowTools( + project_root=project_root, + state_manager=state_manager, + status_writer=status_writer, + ) + + async def test_get_workflows_writes_manifest( + self, tools_with_status: WorkflowTools + ) -> None: + tools_with_status.get_workflows() + assert tools_with_status.status_writer is not None + assert tools_with_status.status_writer.manifest_path.exists() + + async def test_start_workflow_writes_session_status( + self, tools_with_status: WorkflowTools, project_root: Path + ) -> None: + (project_root / "output1.md").write_text("test") + await tools_with_status.start_workflow( + StartWorkflowInput( + goal="Test", + job_name="test_job", + workflow_name="main", + session_id=SESSION_ID, + ) + ) + assert tools_with_status.status_writer is not None + session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" + assert session_file.exists() + + async def test_finished_step_writes_session_status( + self, tools_with_status: WorkflowTools, project_root: Path + ) -> None: + (project_root / "output1.md").write_text("test") + (project_root / "output2.md").write_text("test") + await tools_with_status.start_workflow( + StartWorkflowInput( + goal="Test", + job_name="test_job", + workflow_name="main", + session_id=SESSION_ID, + ) + ) + response = await tools_with_status.finished_step( + FinishedStepInput( + outputs={"output1.md": "output1.md"}, + session_id=SESSION_ID, + quality_review_override_reason="skip", + ) + ) + assert response.status == StepStatus.NEXT_STEP + assert tools_with_status.status_writer is not None + session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" + assert session_file.exists() + + async def test_abort_workflow_writes_session_status( + self, tools_with_status: WorkflowTools, project_root: Path + ) -> None: + await tools_with_status.start_workflow( + StartWorkflowInput( + goal="Test", + job_name="test_job", + workflow_name="main", + session_id=SESSION_ID, + ) + ) + await tools_with_status.abort_workflow( + AbortWorkflowInput( + explanation="Done", + session_id=SESSION_ID, + ) + ) + assert tools_with_status.status_writer is not None + session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" + assert session_file.exists() + + async def test_status_writer_failure_does_not_break_tool( + self, project_root: Path, state_manager: StateManager + ) -> None: + """StatusWriter errors are swallowed — tools should still work.""" + from unittest.mock import MagicMock + + broken_writer = MagicMock(spec=StatusWriter) + broken_writer.write_session_status.side_effect = RuntimeError("disk full") + broken_writer.write_manifest.side_effect = RuntimeError("disk full") + + tools = WorkflowTools( + project_root=project_root, + state_manager=state_manager, + status_writer=broken_writer, + ) + + # get_workflows should still work + response = tools.get_workflows() + assert len(response.jobs) >= 1 + + # start_workflow should still work + await tools.start_workflow( + StartWorkflowInput( + goal="Test", + job_name="test_job", + workflow_name="main", + session_id=SESSION_ID, + ) + ) From 6598a994a0f4511cf354411a0d446a1cebc3eea4 Mon Sep 17 00:00:00 2001 From: Noah Horton Date: Mon, 9 Mar 2026 16:22:10 -0400 Subject: [PATCH 2/5] Address review findings: traceability comments, missing tests, RFC 2119 fix - Add traceability comments (JOBS-REQ-010.x) to all new tests - Add missing test for go_to_step write trigger (JOBS-REQ-010.6.3) - Add test for status directory v1 path structure (JOBS-REQ-010.1, 010.13) - Fix RFC 2119 keywords in JOBS-REQ-010.13 requirements 1 and 2 - Add type annotations to test_status.py helper methods (mypy fix) Co-Authored-By: Claude Opus 4.6 --- .../jobs/JOBS-REQ-010-status-reporting.md | 4 +- tests/unit/jobs/mcp/test_state.py | 40 ++++++++-- tests/unit/jobs/mcp/test_status.py | 79 ++++++++++++++----- tests/unit/jobs/mcp/test_tools.py | 48 +++++++++-- 4 files changed, 137 insertions(+), 34 deletions(-) diff --git a/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md b/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md index 75f05094..70b2f8a7 100644 --- a/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md +++ b/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md @@ -91,8 +91,8 @@ DeepWork provides a file-based external interface for reporting the current stat ### JOBS-REQ-010.13: External Interface Stability -1. The file format of `job_manifest.yml` and `sessions/.yml` is a stable external contract. -2. Field additions are permitted (backward-compatible). +1. The file format of `job_manifest.yml` and `sessions/.yml` MUST be treated as a stable external contract. +2. Field additions MAY be made (backward-compatible). 3. Field removals, renames, or semantic changes MUST NOT be made without incrementing the version path (e.g., `v2/`). ## Test Coverage diff --git a/tests/unit/jobs/mcp/test_state.py b/tests/unit/jobs/mcp/test_state.py index c7472060..fe6da123 100644 --- a/tests/unit/jobs/mcp/test_state.py +++ b/tests/unit/jobs/mcp/test_state.py @@ -814,6 +814,8 @@ def project_root(self, tmp_path: Path) -> Path: def state_manager(self, project_root: Path) -> StateManager: return StateManager(project_root=project_root, platform="test") + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.7.1, JOBS-REQ-010.7.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_workflow_instance_id_generated(self, state_manager: StateManager) -> None: """Each session gets a unique workflow_instance_id.""" session = await state_manager.create_session( @@ -826,6 +828,8 @@ async def test_workflow_instance_id_generated(self, state_manager: StateManager) assert session.workflow_instance_id assert len(session.workflow_instance_id) == 32 # uuid4().hex + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.7.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_workflow_instance_ids_unique(self, state_manager: StateManager) -> None: """Two sessions get different instance IDs.""" s1 = await state_manager.create_session( @@ -844,6 +848,8 @@ async def test_workflow_instance_ids_unique(self, state_manager: StateManager) - ) assert s1.workflow_instance_id != s2.workflow_instance_id + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.7.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_workflow_instance_id_persists( self, state_manager: StateManager, project_root: Path ) -> None: @@ -873,6 +879,8 @@ def project_root(self, tmp_path: Path) -> Path: def state_manager(self, project_root: Path) -> StateManager: return StateManager(project_root=project_root, platform="test") + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.8.1, JOBS-REQ-010.8.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_start_step_appends_history(self, state_manager: StateManager) -> None: """start_step appends to step_history.""" await state_manager.create_session( @@ -889,6 +897,8 @@ async def test_start_step_appends_history(self, state_manager: StateManager) -> assert session.step_history[0].step_id == "step1" assert session.step_history[0].started_at is not None + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.8.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_complete_step_sets_finished_at(self, state_manager: StateManager) -> None: """complete_step updates the last history entry's finished_at.""" await state_manager.create_session( @@ -904,9 +914,9 @@ async def test_complete_step_sets_finished_at(self, state_manager: StateManager) session = state_manager.resolve_session(SESSION_ID) assert session.step_history[0].finished_at is not None - async def test_go_to_step_creates_duplicate_history( - self, state_manager: StateManager - ) -> None: + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.8.4, JOBS-REQ-010.8.5). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_go_to_step_creates_duplicate_history(self, state_manager: StateManager) -> None: """go_to_step + start_step creates a second entry for the same step.""" await state_manager.create_session( session_id=SESSION_ID, @@ -931,6 +941,8 @@ async def test_go_to_step_creates_duplicate_history( assert len(session.step_history) == 2 assert all(h.step_id == "step1" for h in session.step_history) + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.8.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_multi_step_history_ordering(self, state_manager: StateManager) -> None: """Steps appear in history in execution order.""" await state_manager.create_session( @@ -964,6 +976,8 @@ def project_root(self, tmp_path: Path) -> Path: def state_manager(self, project_root: Path) -> StateManager: return StateManager(project_root=project_root, platform="test") + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.10.1, JOBS-REQ-010.10.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_complete_workflow_preserves_in_completed( self, state_manager: StateManager ) -> None: @@ -986,9 +1000,9 @@ async def test_complete_workflow_preserves_in_completed( assert data["completed_workflows"][0]["workflow_instance_id"] == instance_id assert data["completed_workflows"][0]["status"] == "completed" - async def test_abort_workflow_preserves_in_completed( - self, state_manager: StateManager - ) -> None: + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.10.1, JOBS-REQ-010.10.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_abort_workflow_preserves_in_completed(self, state_manager: StateManager) -> None: """Aborted workflow is moved to completed_workflows list.""" session = await state_manager.create_session( session_id=SESSION_ID, @@ -1007,6 +1021,8 @@ async def test_abort_workflow_preserves_in_completed( assert data["completed_workflows"][0]["workflow_instance_id"] == instance_id assert data["completed_workflows"][0]["status"] == "aborted" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.10.5). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_multiple_completed_workflows(self, state_manager: StateManager) -> None: """Multiple completed workflows accumulate.""" await state_manager.create_session( @@ -1046,10 +1062,14 @@ def project_root(self, tmp_path: Path) -> Path: def state_manager(self, project_root: Path) -> StateManager: return StateManager(project_root=project_root, platform="test") + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.11.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_returns_empty_for_missing_session(self, state_manager: StateManager) -> None: result = state_manager.get_all_session_data("nonexistent") assert result == {} + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.11.1, JOBS-REQ-010.11.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_returns_main_stack(self, state_manager: StateManager) -> None: await state_manager.create_session( session_id=SESSION_ID, @@ -1065,6 +1085,8 @@ async def test_returns_main_stack(self, state_manager: StateManager) -> None: assert len(active_stack) == 1 assert len(completed) == 0 + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.11.1, JOBS-REQ-010.11.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_returns_agent_stacks(self, state_manager: StateManager) -> None: await state_manager.create_session( session_id=SESSION_ID, @@ -1089,6 +1111,8 @@ async def test_returns_agent_stacks(self, state_manager: StateManager) -> None: assert len(active) == 1 assert active[0].job_name == "agent_job" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.11.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_includes_completed_workflows(self, state_manager: StateManager) -> None: await state_manager.create_session( session_id=SESSION_ID, @@ -1120,6 +1144,8 @@ def project_root(self, tmp_path: Path) -> Path: def state_manager(self, project_root: Path) -> StateManager: return StateManager(project_root=project_root, platform="test") + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.9.1, JOBS-REQ-010.9.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_nested_workflow_records_instance_on_parent_step_progress( self, state_manager: StateManager ) -> None: @@ -1149,6 +1175,8 @@ async def test_nested_workflow_records_instance_on_parent_step_progress( in parent_data["step_progress"]["step1"]["sub_workflow_instance_ids"] ) + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.9.2, JOBS-REQ-010.9.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_nested_workflow_records_instance_on_parent_step_history( self, state_manager: StateManager ) -> None: diff --git a/tests/unit/jobs/mcp/test_status.py b/tests/unit/jobs/mcp/test_status.py index a70e16b8..f84e83c9 100644 --- a/tests/unit/jobs/mcp/test_status.py +++ b/tests/unit/jobs/mcp/test_status.py @@ -1,6 +1,9 @@ """Tests for MCP status file writer.""" +from __future__ import annotations + import json +from collections.abc import Callable from pathlib import Path import pytest @@ -61,31 +64,57 @@ def _make_job( class TestDeriveDisplayName: + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.4.1, JOBS-REQ-010.4.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_underscore(self) -> None: assert _derive_display_name("competitive_research") == "Competitive Research" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.4.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_hyphen(self) -> None: assert _derive_display_name("ad-campaign") == "Ad Campaign" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.4.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_mixed(self) -> None: assert _derive_display_name("my_job-name") == "My Job Name" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.4.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_single_word(self) -> None: assert _derive_display_name("report") == "Report" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.4.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_already_title(self) -> None: assert _derive_display_name("Report") == "Report" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.4.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_empty(self) -> None: assert _derive_display_name("") == "" +class TestStatusDirectoryStructure: + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.1.1, JOBS-REQ-010.1.4, JOBS-REQ-010.13.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + def test_status_dir_uses_v1_path(self, status_writer: StatusWriter, project_root: Path) -> None: + """Status directory uses versioned v1 path.""" + assert status_writer.status_dir == project_root / ".deepwork" / "tmp" / "status" / "v1" + assert status_writer.manifest_path == status_writer.status_dir / "job_manifest.yml" + assert status_writer.sessions_dir == status_writer.status_dir / "sessions" + + class TestWriteManifest: + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.1.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_creates_manifest_file(self, status_writer: StatusWriter) -> None: jobs = [_make_job()] status_writer.write_manifest(jobs) assert status_writer.manifest_path.exists() + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.2.1, JOBS-REQ-010.2.2, JOBS-REQ-010.2.3, JOBS-REQ-010.2.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_manifest_structure(self, status_writer: StatusWriter) -> None: jobs = [_make_job()] status_writer.write_manifest(jobs) @@ -108,6 +137,8 @@ def test_manifest_structure(self, status_writer: StatusWriter) -> None: assert wf["steps"][0]["name"] == "step1" assert wf["steps"][0]["display_name"] == "Step1" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.2.5). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_manifest_sorted_by_name(self, status_writer: StatusWriter) -> None: jobs = [ _make_job(name="zebra_job", summary="Z job"), @@ -119,6 +150,8 @@ def test_manifest_sorted_by_name(self, status_writer: StatusWriter) -> None: assert data["jobs"][0]["name"] == "alpha_job" assert data["jobs"][1]["name"] == "zebra_job" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.2.6). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES def test_manifest_multiple_workflows_sorted(self, status_writer: StatusWriter) -> None: wf_b = Workflow( name="beta_wf", @@ -144,16 +177,20 @@ def test_empty_jobs_list(self, status_writer: StatusWriter) -> None: class TestWriteSessionStatus: - def _job_loader(self, jobs: list[JobDefinition] | None = None): + def _job_loader( + self, jobs: list[JobDefinition] | None = None + ) -> Callable[[], tuple[list[JobDefinition], list[str]]]: """Return a callable that returns the provided jobs.""" if jobs is None: jobs = [_make_job()] - def loader(): + def loader() -> tuple[list[JobDefinition], list[str]]: return jobs, [] return loader + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.1.3, JOBS-REQ-010.5.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_writes_session_file( self, status_writer: StatusWriter, @@ -178,6 +215,8 @@ async def test_writes_session_file( assert data["active_workflow"] is not None assert len(data["workflows"]) == 1 + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.5.1, JOBS-REQ-010.5.2, JOBS-REQ-010.5.4, JOBS-REQ-010.5.5). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_session_status_structure( self, status_writer: StatusWriter, @@ -194,9 +233,7 @@ async def test_session_status_structure( status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) - data = yaml.safe_load( - (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() - ) + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) wf = data["workflows"][0] assert wf["workflow_instance_id"] == session.workflow_instance_id @@ -209,6 +246,8 @@ async def test_session_status_structure( assert wf["steps"][0]["step_name"] == "step1" assert wf["steps"][0]["started_at"] is not None + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.5.2, JOBS-REQ-010.5.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_completed_workflow_preserved( self, status_writer: StatusWriter, @@ -227,13 +266,13 @@ async def test_completed_workflow_preserved( status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) - data = yaml.safe_load( - (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() - ) + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) assert data["active_workflow"] is None assert len(data["workflows"]) == 1 assert data["workflows"][0]["status"] == "completed" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.5.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_aborted_workflow_preserved( self, status_writer: StatusWriter, @@ -251,11 +290,11 @@ async def test_aborted_workflow_preserved( status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) - data = yaml.safe_load( - (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() - ) + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) assert data["workflows"][0]["status"] == "aborted" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.5.2, JOBS-REQ-010.5.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_nested_workflows( self, status_writer: StatusWriter, @@ -281,13 +320,13 @@ async def test_nested_workflows( status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) - data = yaml.safe_load( - (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() - ) + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) assert len(data["workflows"]) == 2 # Active workflow should be the top of stack (last one pushed) assert data["active_workflow"] == data["workflows"][1]["workflow_instance_id"] + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.5.4, JOBS-REQ-010.5.5). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_multi_agent_workflows( self, status_writer: StatusWriter, @@ -314,14 +353,14 @@ async def test_multi_agent_workflows( status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) - data = yaml.safe_load( - (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() - ) + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) assert len(data["workflows"]) == 2 agent_ids = [w["agent_id"] for w in data["workflows"]] assert None in agent_ids assert AGENT_ID in agent_ids + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.8.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_step_history_ordering( self, status_writer: StatusWriter, @@ -349,9 +388,7 @@ async def test_step_history_ordering( status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) - data = yaml.safe_load( - (status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text() - ) + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) steps = data["workflows"][0]["steps"] # step1 appears twice in history (original + re-execution) step_names = [s["step_name"] for s in steps] @@ -372,6 +409,8 @@ def test_no_session_data_is_noop( class TestSubWorkflowInstanceTracking: """Tests for sub_workflow_instance_ids in status output.""" + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.9.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_nested_workflow_records_sub_instance_id( self, status_writer: StatusWriter, diff --git a/tests/unit/jobs/mcp/test_tools.py b/tests/unit/jobs/mcp/test_tools.py index e32f41e1..dda22888 100644 --- a/tests/unit/jobs/mcp/test_tools.py +++ b/tests/unit/jobs/mcp/test_tools.py @@ -2375,9 +2375,7 @@ class TestStatusWriterIntegration: """Tests that StatusWriter is called from WorkflowTools.""" @pytest.fixture - def tools_with_status( - self, project_root: Path, state_manager: StateManager - ) -> WorkflowTools: + def tools_with_status(self, project_root: Path, state_manager: StateManager) -> WorkflowTools: status_writer = StatusWriter(project_root) return WorkflowTools( project_root=project_root, @@ -2385,13 +2383,15 @@ def tools_with_status( status_writer=status_writer, ) - async def test_get_workflows_writes_manifest( - self, tools_with_status: WorkflowTools - ) -> None: + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.3.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_get_workflows_writes_manifest(self, tools_with_status: WorkflowTools) -> None: tools_with_status.get_workflows() assert tools_with_status.status_writer is not None assert tools_with_status.status_writer.manifest_path.exists() + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.6.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_start_workflow_writes_session_status( self, tools_with_status: WorkflowTools, project_root: Path ) -> None: @@ -2408,6 +2408,8 @@ async def test_start_workflow_writes_session_status( session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" assert session_file.exists() + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.6.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_finished_step_writes_session_status( self, tools_with_status: WorkflowTools, project_root: Path ) -> None: @@ -2433,6 +2435,8 @@ async def test_finished_step_writes_session_status( session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" assert session_file.exists() + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.6.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_abort_workflow_writes_session_status( self, tools_with_status: WorkflowTools, project_root: Path ) -> None: @@ -2454,6 +2458,38 @@ async def test_abort_workflow_writes_session_status( session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" assert session_file.exists() + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.6.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_go_to_step_writes_session_status( + self, tools_with_status: WorkflowTools, project_root: Path + ) -> None: + (project_root / "output1.md").write_text("test") + (project_root / "output2.md").write_text("test") + await tools_with_status.start_workflow( + StartWorkflowInput( + goal="Test", + job_name="test_job", + workflow_name="main", + session_id=SESSION_ID, + ) + ) + await tools_with_status.finished_step( + FinishedStepInput( + outputs={"output1.md": "output1.md"}, + session_id=SESSION_ID, + quality_review_override_reason="skip", + ) + ) + # Now at step2, go back to step1 + await tools_with_status.go_to_step( + GoToStepInput(step_id="step1", session_id=SESSION_ID) + ) + assert tools_with_status.status_writer is not None + session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" + assert session_file.exists() + + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.12.1, JOBS-REQ-010.12.2). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES async def test_status_writer_failure_does_not_break_tool( self, project_root: Path, state_manager: StateManager ) -> None: From 149a8278d5aa2edf729a4c3ee4f0dc4b6592e8e2 Mon Sep 17 00:00:00 2001 From: Noah Horton Date: Mon, 9 Mar 2026 16:24:37 -0400 Subject: [PATCH 3/5] Fix review findings: add StatusWriter docs and linter fixes - Add StatusWriter component section to doc/architecture.md - Auto-linter fixes for type annotations in state.py and status.py Co-Authored-By: Claude Opus 4.6 --- doc/architecture.md | 31 +++++++++++++++++++++++-- src/deepwork/jobs/mcp/state.py | 14 ++++-------- src/deepwork/jobs/mcp/status.py | 40 +++++++++++++++++++-------------- 3 files changed, 56 insertions(+), 29 deletions(-) diff --git a/doc/architecture.md b/doc/architecture.md index 44066966..d7c6eae6 100644 --- a/doc/architecture.md +++ b/doc/architecture.md @@ -57,7 +57,8 @@ deepwork/ # DeepWork tool repository │ │ ├── state.py # Workflow session state management │ │ ├── schemas.py # Pydantic models for I/O │ │ ├── quality_gate.py # Quality gate with review agent -│ │ └── claude_cli.py # Claude CLI subprocess wrapper +│ │ ├── claude_cli.py # Claude CLI subprocess wrapper +│ │ └── status.py # Status file writer for external consumers │ ├── hooks/ # Hook system and cross-platform wrappers │ │ ├── wrapper.py # Cross-platform input/output normalization │ │ ├── claude_hook.sh # Shell wrapper for Claude Code @@ -1080,6 +1081,7 @@ class StateManager: def get_all_outputs(session_id, agent_id=None) -> dict def get_stack(session_id, agent_id=None) -> list[StackEntry] def get_stack_depth(session_id, agent_id=None) -> int + def get_all_session_data(session_id) -> dict[agent_id, (active_stack, completed_workflows)] ``` Session state includes: @@ -1115,6 +1117,31 @@ The quality gate supports two modes: - **External runner** (`evaluate_reviews`): Invokes Claude Code via subprocess to evaluate each review, returns list of failed `ReviewResult` objects - **Self-review** (`build_review_instructions_file`): Generates a review instructions file for the agent to spawn a subagent for self-review +### Status Writer (`jobs/mcp/status.py`) + +Writes file-based status projections for external consumers (UIs, dashboards, monitoring). Status files are written to `.deepwork/tmp/status/v1/` and are a **stable external interface** — the file format must not change without versioning. + +```python +class StatusWriter: + def __init__(self, project_root: Path) + + def write_manifest(self, jobs: list[JobDefinition]) -> None + """Write job_manifest.yml with all available jobs, workflows, and steps.""" + + def write_session_status(self, session_id: str, state_manager: StateManager, job_loader: Callable) -> None + """Write sessions/.yml from current state.""" +``` + +**Output files:** +- `job_manifest.yml` — catalog of all jobs/workflows/steps, sorted alphabetically +- `sessions/.yml` — per-session workflow execution status including active workflow, step history, and completed/aborted workflows + +**Write triggers:** +- Manifest: MCP server startup, `get_workflows` +- Session status: `start_workflow`, `finished_step`, `go_to_step`, `abort_workflow` + +Status writes are fire-and-forget: failures are logged as warnings and never fail the MCP tool call. + ### Schemas (`jobs/mcp/schemas.py`) Pydantic models for all tool inputs and outputs: @@ -1122,7 +1149,7 @@ Pydantic models for all tool inputs and outputs: - `GetWorkflowsResponse`, `StartWorkflowResponse`, `FinishedStepResponse`, `AbortWorkflowResponse`, `GoToStepResponse` - `ActiveStepInfo`, `ExpectedOutput`, `ReviewInfo`, `ReviewResult`, `StackEntry` - `JobInfo`, `WorkflowInfo`, `JobLoadErrorInfo` -- `WorkflowSession`, `StepProgress` +- `WorkflowSession`, `StepProgress`, `StepHistoryEntry` - `QualityGateResult`, `QualityCriteriaResult` ## MCP Server Registration diff --git a/src/deepwork/jobs/mcp/state.py b/src/deepwork/jobs/mcp/state.py index 9685cee2..8152ccc0 100644 --- a/src/deepwork/jobs/mcp/state.py +++ b/src/deepwork/jobs/mcp/state.py @@ -322,9 +322,7 @@ async def start_step(self, session_id: str, step_id: str, agent_id: str | None = session.step_progress[step_id].started_at = now # Append to step history - session.step_history.append( - StepHistoryEntry(step_id=step_id, started_at=now) - ) + session.step_history.append(StepHistoryEntry(step_id=step_id, started_at=now)) session.current_step_id = step_id await self._write_stack(session_id, stack, agent_id) @@ -671,19 +669,15 @@ def get_all_session_data( except (json.JSONDecodeError, OSError): continue - stack = [ - WorkflowSession.from_dict(entry) - for entry in data.get("workflow_stack", []) - ] + stack = [WorkflowSession.from_dict(entry) for entry in data.get("workflow_stack", [])] completed = [ - WorkflowSession.from_dict(entry) - for entry in data.get("completed_workflows", []) + WorkflowSession.from_dict(entry) for entry in data.get("completed_workflows", []) ] if state_file.name == "state.json": agent_id = None elif state_file.name.startswith("agent_") and state_file.name.endswith(".json"): - agent_id = state_file.name[len("agent_"):-len(".json")] + agent_id = state_file.name[len("agent_") : -len(".json")] else: continue diff --git a/src/deepwork/jobs/mcp/status.py b/src/deepwork/jobs/mcp/status.py index b338a5e6..6cd6baf8 100644 --- a/src/deepwork/jobs/mcp/status.py +++ b/src/deepwork/jobs/mcp/status.py @@ -72,23 +72,29 @@ def write_manifest(self, jobs: list[JobDefinition]) -> None: for wf in sorted_workflows: steps_list: list[dict[str, str]] = [] for step_id in wf.steps: - steps_list.append({ - "name": step_id, - "display_name": _derive_display_name(step_id), - }) - wf_list.append({ - "name": wf.name, - "display_name": _derive_display_name(wf.name), - "summary": wf.summary, - "steps": steps_list, - }) - - manifest_jobs.append({ - "name": job.name, - "display_name": _derive_display_name(job.name), - "summary": job.summary, - "workflows": wf_list, - }) + steps_list.append( + { + "name": step_id, + "display_name": _derive_display_name(step_id), + } + ) + wf_list.append( + { + "name": wf.name, + "display_name": _derive_display_name(wf.name), + "summary": wf.summary, + "steps": steps_list, + } + ) + + manifest_jobs.append( + { + "name": job.name, + "display_name": _derive_display_name(job.name), + "summary": job.summary, + "workflows": wf_list, + } + ) save_yaml(self.manifest_path, {"jobs": manifest_jobs}) From 5816625321bef1e3bc62033089017a99b481fc38 Mon Sep 17 00:00:00 2001 From: Noah Horton Date: Mon, 9 Mar 2026 16:26:27 -0400 Subject: [PATCH 4/5] Fix ruff format and add status file display to e2e CI - Fix formatting in test_tools.py (ruff format) - Add "Display status files" step to e2e workflow to cat status files after run for observability - Include status directory in artifact uploads Co-Authored-By: Claude Opus 4.6 --- .github/workflows/claude-code-test.yml | 24 ++++++++++++++++++++++++ tests/unit/jobs/mcp/test_tools.py | 4 +--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/.github/workflows/claude-code-test.yml b/.github/workflows/claude-code-test.yml index 26f9be1b..5b37b87e 100644 --- a/.github/workflows/claude-code-test.yml +++ b/.github/workflows/claude-code-test.yml @@ -436,6 +436,29 @@ jobs: echo "Workflow tested: /deepwork fruits full - Executed full fruits workflow (identify + classify)" echo "" + - name: Display status files + if: steps.check-key.outputs.has_key == 'true' && always() + working-directory: test_project + run: | + echo "=== Status Files ===" + STATUS_DIR=".deepwork/tmp/status/v1" + if [ -d "$STATUS_DIR" ]; then + echo "--- job_manifest.yml ---" + cat "$STATUS_DIR/job_manifest.yml" 2>/dev/null || echo "(not found)" + echo "" + if [ -d "$STATUS_DIR/sessions" ]; then + for f in "$STATUS_DIR/sessions"/*.yml; do + echo "--- $(basename "$f") ---" + cat "$f" + echo "" + done + else + echo "(no session status files)" + fi + else + echo "(status directory not found)" + fi + - name: Upload test artifacts if: steps.check-key.outputs.has_key == 'true' && always() uses: actions/upload-artifact@v4 @@ -443,6 +466,7 @@ jobs: name: claude-code-e2e-outputs path: | test_project/.deepwork/jobs/fruits/ + test_project/.deepwork/tmp/status/ test_project/.claude/skills/deepwork/ test_project/fruits/identified_fruits.md test_project/fruits/classified_fruits.md diff --git a/tests/unit/jobs/mcp/test_tools.py b/tests/unit/jobs/mcp/test_tools.py index dda22888..5346f8c2 100644 --- a/tests/unit/jobs/mcp/test_tools.py +++ b/tests/unit/jobs/mcp/test_tools.py @@ -2481,9 +2481,7 @@ async def test_go_to_step_writes_session_status( ) ) # Now at step2, go back to step1 - await tools_with_status.go_to_step( - GoToStepInput(step_id="step1", session_id=SESSION_ID) - ) + await tools_with_status.go_to_step(GoToStepInput(step_id="step1", session_id=SESSION_ID)) assert tools_with_status.status_writer is not None session_file = tools_with_status.status_writer.sessions_dir / f"{SESSION_ID}.yml" assert session_file.exists() From 4fb7c3477c5e1aaec9f740d208c5de964297d067 Mon Sep 17 00:00:00 2001 From: Noah Horton Date: Mon, 9 Mar 2026 16:29:39 -0400 Subject: [PATCH 5/5] Add missing requirement traceability tests - JOBS-REQ-010.3.1: test startup writes manifest via create_server - JOBS-REQ-010.5.3: test last_updated_at is ISO 8601 UTC - JOBS-REQ-010.9.4: test cross-agent sub-workflow records on main stack - JOBS-REQ-010.10.4: test _write_stack preserves completed_workflows - Update test coverage table in spec Co-Authored-By: Claude Opus 4.6 --- .../jobs/JOBS-REQ-010-status-reporting.md | 12 ++-- tests/unit/jobs/mcp/test_state.py | 70 +++++++++++++++++++ tests/unit/jobs/mcp/test_status.py | 29 ++++++++ tests/unit/jobs/mcp/test_tools.py | 13 ++++ 4 files changed, 119 insertions(+), 5 deletions(-) diff --git a/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md b/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md index 70b2f8a7..7aad0f0b 100644 --- a/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md +++ b/specs/deepwork/jobs/JOBS-REQ-010-status-reporting.md @@ -101,14 +101,16 @@ DeepWork provides a file-based external interface for reporting the current stat |-------------|-----------|-----------| | JOBS-REQ-010.1 | test_status.py | TestWriteManifest::test_creates_manifest_file | | JOBS-REQ-010.2 | test_status.py | TestWriteManifest::test_manifest_structure | -| JOBS-REQ-010.3 | test_tools.py | TestStatusWriterIntegration::test_get_workflows_writes_manifest | +| JOBS-REQ-010.3.1 | test_tools.py | TestStatusWriterIntegration::test_startup_writes_manifest | +| JOBS-REQ-010.3.2 | test_tools.py | TestStatusWriterIntegration::test_get_workflows_writes_manifest | | JOBS-REQ-010.4 | test_status.py | TestDeriveDisplayName::* | -| JOBS-REQ-010.5 | test_status.py | TestWriteSessionStatus::test_session_status_structure | +| JOBS-REQ-010.5 | test_status.py | TestWriteSessionStatus::test_session_status_structure, test_last_updated_at_is_iso8601_utc | | JOBS-REQ-010.6 | test_tools.py | TestStatusWriterIntegration::test_start_workflow_writes_session_status, test_finished_step_writes_session_status, test_abort_workflow_writes_session_status | | JOBS-REQ-010.7 | test_state.py | TestWorkflowInstanceId::* | | JOBS-REQ-010.8 | test_state.py | TestStepHistory::* | -| JOBS-REQ-010.9 | test_state.py | TestSubWorkflowInstanceIds::* | -| JOBS-REQ-010.10 | test_state.py | TestCompletedWorkflows::* | +| JOBS-REQ-010.9 | test_state.py | TestSubWorkflowInstanceIds::* (incl. test_cross_agent_sub_workflow_records_on_main_stack) | +| JOBS-REQ-010.10 | test_state.py | TestCompletedWorkflows::* (incl. test_write_stack_preserves_completed_workflows) | | JOBS-REQ-010.11 | test_state.py | TestGetAllSessionData::* | -| JOBS-REQ-010.12 | test_tools.py | TestStatusWriterIntegration::test_status_writer_failure_does_not_break_tool | +| JOBS-REQ-010.12.1, .12.2 | test_tools.py | TestStatusWriterIntegration::test_status_writer_failure_does_not_break_tool | +| JOBS-REQ-010.12.3 | (Code review — synchronous write is acceptable since it's fire-and-forget) | | JOBS-REQ-010.13 | (Manual review — structural contract) | diff --git a/tests/unit/jobs/mcp/test_state.py b/tests/unit/jobs/mcp/test_state.py index fe6da123..97b31b3d 100644 --- a/tests/unit/jobs/mcp/test_state.py +++ b/tests/unit/jobs/mcp/test_state.py @@ -1047,6 +1047,37 @@ async def test_multiple_completed_workflows(self, state_manager: StateManager) - data = json.loads(state_file.read_text()) assert len(data["completed_workflows"]) == 2 + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.10.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_write_stack_preserves_completed_workflows( + self, state_manager: StateManager + ) -> None: + """_write_stack preserves existing completed_workflows when not explicitly provided.""" + # Complete a workflow so completed_workflows exists + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job1", + workflow_name="wf1", + goal="Goal", + first_step_id="step1", + ) + await state_manager.complete_workflow(SESSION_ID) + + # Start a new workflow — _write_stack is called without completed_workflows param + await state_manager.create_session( + session_id=SESSION_ID, + job_name="job2", + workflow_name="wf2", + goal="Goal 2", + first_step_id="step1", + ) + + # Verify completed_workflows was preserved + state_file = state_manager._state_file(SESSION_ID) + data = json.loads(state_file.read_text()) + assert len(data["completed_workflows"]) == 1 + assert len(data["workflow_stack"]) == 1 + class TestGetAllSessionData: """Tests for get_all_session_data.""" @@ -1205,3 +1236,42 @@ async def test_nested_workflow_records_instance_on_parent_step_history( child.workflow_instance_id in parent_data["step_history"][-1]["sub_workflow_instance_ids"] ) + + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.9.4). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_cross_agent_sub_workflow_records_on_main_stack( + self, state_manager: StateManager + ) -> None: + """Cross-agent sub-workflow records instance ID on main stack parent's step.""" + # Create parent on main stack + await state_manager.create_session( + session_id=SESSION_ID, + job_name="parent_job", + workflow_name="parent_wf", + goal="Parent", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + # Create child on agent stack — this should also update main stack parent + child = await state_manager.create_session( + session_id=SESSION_ID, + job_name="child_job", + workflow_name="child_wf", + goal="Child", + first_step_id="child_step1", + agent_id=AGENT_ID, + ) + + # Verify main stack parent has the child's instance ID + main_state_file = state_manager._state_file(SESSION_ID, agent_id=None) + main_data = json.loads(main_state_file.read_text()) + parent_data = main_data["workflow_stack"][0] + assert ( + child.workflow_instance_id + in parent_data["step_progress"]["step1"]["sub_workflow_instance_ids"] + ) + assert ( + child.workflow_instance_id + in parent_data["step_history"][-1]["sub_workflow_instance_ids"] + ) diff --git a/tests/unit/jobs/mcp/test_status.py b/tests/unit/jobs/mcp/test_status.py index f84e83c9..92acb431 100644 --- a/tests/unit/jobs/mcp/test_status.py +++ b/tests/unit/jobs/mcp/test_status.py @@ -394,6 +394,35 @@ async def test_step_history_ordering( step_names = [s["step_name"] for s in steps] assert step_names == ["step1", "step1"] + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.5.3). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + async def test_last_updated_at_is_iso8601_utc( + self, + status_writer: StatusWriter, + state_manager: StateManager, + ) -> None: + """last_updated_at must be an ISO 8601 timestamp in UTC.""" + from datetime import datetime, timedelta + + await state_manager.create_session( + session_id=SESSION_ID, + job_name="test_job", + workflow_name="main", + goal="Test goal", + first_step_id="step1", + ) + await state_manager.start_step(SESSION_ID, "step1") + + status_writer.write_session_status(SESSION_ID, state_manager, self._job_loader()) + + data = yaml.safe_load((status_writer.sessions_dir / f"{SESSION_ID}.yml").read_text()) + ts = data["last_updated_at"] + # Must parse as ISO 8601 + parsed = datetime.fromisoformat(ts) + # Must be UTC (offset +00:00) + assert parsed.tzinfo is not None + assert parsed.utcoffset() == timedelta(0) + def test_no_session_data_is_noop( self, status_writer: StatusWriter, diff --git a/tests/unit/jobs/mcp/test_tools.py b/tests/unit/jobs/mcp/test_tools.py index 5346f8c2..391db923 100644 --- a/tests/unit/jobs/mcp/test_tools.py +++ b/tests/unit/jobs/mcp/test_tools.py @@ -2517,3 +2517,16 @@ async def test_status_writer_failure_does_not_break_tool( session_id=SESSION_ID, ) ) + + # THIS TEST VALIDATES A HARD REQUIREMENT (JOBS-REQ-010.3.1). + # YOU MUST NOT MODIFY THIS TEST UNLESS THE REQUIREMENT CHANGES + def test_startup_writes_manifest(self, project_root: Path, state_manager: StateManager) -> None: + """StatusWriter.write_manifest is called during create_server startup.""" + from unittest.mock import MagicMock, patch + + mock_status_writer = MagicMock(spec=StatusWriter) + with patch("deepwork.jobs.mcp.server.StatusWriter", return_value=mock_status_writer): + from deepwork.jobs.mcp.server import create_server + + create_server(project_root=project_root, external_runner=None) + mock_status_writer.write_manifest.assert_called_once()