diff --git a/examples/staged-review.yaml b/examples/staged-review.yaml new file mode 100644 index 0000000..4591229 --- /dev/null +++ b/examples/staged-review.yaml @@ -0,0 +1,145 @@ +# Staged Review Workflow +# +# This example demonstrates staged agent re-invocation, where the same +# agent appears at multiple stages in a workflow with different prompts. +# The VP plans the project, the IC implements it, and then the same VP +# reviews the implementation — without needing a duplicate agent definition. +# +# Key features demonstrated: +# - stages: dict on AgentDef for multi-invocation +# - Stage-qualified route targets (vp:review) +# - Stage-specific context access (stages.vp.default.output) +# - Conditional loop-back from review to IC +# +# Usage: +# conductor run examples/staged-review.yaml --input project="Build a REST API" + +workflow: + name: staged-review + description: Demonstrates staged agent re-invocation (VP→IC→VP:review) + version: "1.0.0" + entry_point: vp + + runtime: + provider: copilot + default_model: gpt-4.1 + + input: + project: + type: string + required: true + description: The project to plan, implement, and review + + context: + mode: accumulate + + limits: + max_iterations: 10 + timeout_seconds: 300 + +agents: + # VP: appears twice — once to plan, once to review + - name: vp + description: VP of Engineering — plans and reviews + model: gpt-4.1 + tools: [] + system_prompt: | + You are the VP of Engineering. You set technical direction + and review your team's execution quality. + + prompt: | + Create a technical plan for: {{ workflow.input.project }} + + Define: + 1. Architecture approach + 2. Key components to build + 3. Success criteria for the implementation + input: + - workflow.input.project + output: + plan: + type: string + description: The technical plan + components: + type: array + description: Key components to build + success_criteria: + type: array + description: Criteria for successful implementation + routes: + - to: ic + + stages: + review: + description: Reviews IC implementation against the original plan + prompt: | + Review the IC's implementation against your original plan. + + YOUR ORIGINAL PLAN: + {{ stages.vp.default.output.plan }} + + SUCCESS CRITERIA: + {{ stages.vp.default.output.success_criteria | json }} + + IC IMPLEMENTATION: + {{ ic.output | json }} + + Evaluate: + 1. Does the implementation match the plan? + 2. Are success criteria met? + 3. What improvements are needed? + input: + - vp:default.output + - ic.output + output: + approved: + type: boolean + description: Whether the implementation meets the plan + score: + type: number + description: Quality score from 1-10 + feedback: + type: string + description: Detailed review feedback + routes: + - to: ic + when: "{{ not output.approved }}" + - to: "$end" + + # IC: implements the VP's plan + - name: ic + description: Individual Contributor — implements the plan + model: gpt-4.1 + tools: [] + prompt: | + Implement the VP's technical plan. + + PLAN: {{ vp.output.plan }} + COMPONENTS: {{ vp.output.components | json }} + + {% if stages.vp.review is defined %} + VP FEEDBACK FROM PREVIOUS REVIEW: + {{ stages.vp.review.output.feedback }} + {% endif %} + + Produce a detailed implementation for each component. + input: + - vp.output + - vp:review.output? + output: + implementation: + type: object + description: Implementation details for each component + status: + type: string + description: Overall implementation status + routes: + - to: vp:review + +output: + project: "{{ workflow.input.project }}" + plan: "{{ stages.vp.default.output.plan }}" + implementation: "{{ ic.output.implementation | json }}" + approved: "{{ stages.vp.review.output.approved }}" + score: "{{ stages.vp.review.output.score }}" + feedback: "{{ stages.vp.review.output.feedback }}" diff --git a/screenshots-for-pr/staged-in-progress-ic-detail.png b/screenshots-for-pr/staged-in-progress-ic-detail.png new file mode 100644 index 0000000..be1b826 Binary files /dev/null and b/screenshots-for-pr/staged-in-progress-ic-detail.png differ diff --git a/screenshots-for-pr/staged-node-ic.png b/screenshots-for-pr/staged-node-ic.png new file mode 100644 index 0000000..8e09586 Binary files /dev/null and b/screenshots-for-pr/staged-node-ic.png differ diff --git a/screenshots-for-pr/staged-node-vp-default.png b/screenshots-for-pr/staged-node-vp-default.png new file mode 100644 index 0000000..1b6df11 Binary files /dev/null and b/screenshots-for-pr/staged-node-vp-default.png differ diff --git a/screenshots-for-pr/staged-node-vp-review.png b/screenshots-for-pr/staged-node-vp-review.png new file mode 100644 index 0000000..20922fd Binary files /dev/null and b/screenshots-for-pr/staged-node-vp-review.png differ diff --git a/screenshots-for-pr/staged-workflow-completed.png b/screenshots-for-pr/staged-workflow-completed.png new file mode 100644 index 0000000..3884461 Binary files /dev/null and b/screenshots-for-pr/staged-workflow-completed.png differ diff --git a/screenshots-for-pr/staged-workflow-dark-mode.png b/screenshots-for-pr/staged-workflow-dark-mode.png new file mode 100644 index 0000000..684134b Binary files /dev/null and b/screenshots-for-pr/staged-workflow-dark-mode.png differ diff --git a/screenshots-for-pr/staged-workflow-in-progress.png b/screenshots-for-pr/staged-workflow-in-progress.png new file mode 100644 index 0000000..e4f06ed Binary files /dev/null and b/screenshots-for-pr/staged-workflow-in-progress.png differ diff --git a/screenshots-for-pr/staged-workflow-mobile.png b/screenshots-for-pr/staged-workflow-mobile.png new file mode 100644 index 0000000..0584386 Binary files /dev/null and b/screenshots-for-pr/staged-workflow-mobile.png differ diff --git a/screenshots-for-pr/staged-workflow-node-detail.png b/screenshots-for-pr/staged-workflow-node-detail.png new file mode 100644 index 0000000..d242935 Binary files /dev/null and b/screenshots-for-pr/staged-workflow-node-detail.png differ diff --git a/src/conductor/config/__init__.py b/src/conductor/config/__init__.py index 46bf1a1..e3b895f 100644 --- a/src/conductor/config/__init__.py +++ b/src/conductor/config/__init__.py @@ -4,6 +4,7 @@ and environment variable resolution. """ +from conductor.config.expander import expand_stages from conductor.config.loader import ( ConfigLoader, load_config, @@ -20,6 +21,7 @@ OutputField, RouteDef, RuntimeConfig, + StageDef, WorkflowConfig, WorkflowDef, ) @@ -30,6 +32,7 @@ "ConfigLoader", "load_config", "load_config_string", + "expand_stages", "resolve_env_vars", # Schema models "AgentDef", @@ -40,6 +43,7 @@ "LimitsConfig", "OutputField", "RouteDef", + "StageDef", "RuntimeConfig", "WorkflowConfig", "WorkflowDef", diff --git a/src/conductor/config/expander.py b/src/conductor/config/expander.py new file mode 100644 index 0000000..e4b8b75 --- /dev/null +++ b/src/conductor/config/expander.py @@ -0,0 +1,137 @@ +"""Stage expansion for staged agents. + +This module expands agents with ``stages`` definitions into synthetic +``AgentDef`` instances at config load time, so the workflow engine sees +only regular agents and requires minimal changes. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from conductor.config.schema import WorkflowConfig + +logger = logging.getLogger(__name__) + + +def expand_stages(config: WorkflowConfig) -> WorkflowConfig: + """Expand staged agents into synthetic AgentDef instances. + + For each agent with a non-None ``stages`` dict: + 1. Creates a ``agent:default`` synthetic agent from the base definition. + 2. Creates ``agent:stage`` synthetic agents for each stage, applying + per-stage overrides (prompt, input, output, routes, description). + 3. Rewrites bare route targets pointing to staged agents to use + ``agent:default``. + 4. Rewrites ``entry_point`` if it references a staged agent. + + Called after Pydantic validation, before cross-reference validation. + + Args: + config: The validated WorkflowConfig with potential staged agents. + + Returns: + The same config, mutated in place, with synthetic agents added + and route targets rewritten. + """ + from conductor.config.schema import AgentDef + + staged_agents = [a for a in config.agents if a.stages] + if not staged_agents: + return config + + staged_agent_names = {a.name for a in staged_agents} + existing_names = {a.name for a in config.agents} + synthetic_agents: list[AgentDef] = [] + + for agent in staged_agents: + assert agent.stages is not None # guaranteed by filter above + # Validate no name collisions with existing agents + for stage_name in agent.stages: + synthetic_name = f"{agent.name}:{stage_name}" + if synthetic_name in existing_names: + from conductor.exceptions import ConfigurationError + + raise ConfigurationError( + f"Name collision: agent '{synthetic_name}' already exists, " + f"conflicts with stage '{stage_name}' of agent '{agent.name}'" + ) + default_name = f"{agent.name}:default" + if default_name in existing_names: + from conductor.exceptions import ConfigurationError + + raise ConfigurationError( + f"Name collision: agent '{default_name}' already exists, " + f"conflicts with default stage of agent '{agent.name}'" + ) + + # Create the default synthetic agent from the base definition + default_agent = agent.model_copy(deep=True) + default_agent.name = default_name + default_agent.stages = None + synthetic_agents.append(default_agent) + + # Create one synthetic agent per stage + for stage_name, stage_def in agent.stages.items(): + stage_agent = agent.model_copy(deep=True) + stage_agent.name = f"{agent.name}:{stage_name}" + stage_agent.stages = None + + # Override fields from StageDef + if stage_def.prompt is not None: + stage_agent.prompt = stage_def.prompt + if stage_def.input is not None: + stage_agent.input = stage_def.input + if stage_def.output is not None: + stage_agent.output = stage_def.output + if stage_def.routes is not None: + stage_agent.routes = stage_def.routes + if stage_def.description is not None: + stage_agent.description = stage_def.description + + synthetic_agents.append(stage_agent) + + # Add synthetic agents to config + config.agents.extend(synthetic_agents) + + # Rewrite bare route targets for staged agents + _rewrite_routes(config, staged_agent_names) + + # Rewrite entry_point + if config.workflow.entry_point in staged_agent_names: + config.workflow.entry_point = f"{config.workflow.entry_point}:default" + + return config + + +def _rewrite_routes(config: WorkflowConfig, staged_agent_names: set[str]) -> None: + """Rewrite bare route targets pointing to staged agents. + + Any route ``to: "agent_name"`` where ``agent_name`` has stages is + rewritten to ``to: "agent_name:default"``. + + Args: + config: The WorkflowConfig to modify. + staged_agent_names: Set of agent names that have stages. + """ + # Rewrite agent routes (including synthetic agents already added) + for agent in config.agents: + if agent.stages is not None: + continue # Skip original staged agents + for route in agent.routes: + if route.to in staged_agent_names: + route.to = f"{route.to}:default" + + # Rewrite parallel group routes + for pg in config.parallel: + for route in pg.routes: + if route.to in staged_agent_names: + route.to = f"{route.to}:default" + + # Rewrite for-each group routes + for fe in config.for_each: + for route in fe.routes: + if route.to in staged_agent_names: + route.to = f"{route.to}:default" diff --git a/src/conductor/config/loader.py b/src/conductor/config/loader.py index c304f4f..22e047c 100644 --- a/src/conductor/config/loader.py +++ b/src/conductor/config/loader.py @@ -16,6 +16,7 @@ from ruamel.yaml.constructor import RoundTripConstructor from ruamel.yaml.error import YAMLError +from conductor.config.expander import expand_stages from conductor.config.schema import WorkflowConfig from conductor.exceptions import ConfigurationError @@ -324,7 +325,8 @@ def _validate(self, data: dict[str, Any], source: str) -> WorkflowConfig: ConfigurationError: If the data fails schema validation. """ try: - return WorkflowConfig.model_validate(data) + config = WorkflowConfig.model_validate(data) + return expand_stages(config) except Exception as e: # Format Pydantic validation errors nicely error_msg = str(e) diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 1f1ad13..5a89673 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -268,6 +268,29 @@ class GateOption(BaseModel): """Optional: field name to prompt for text input.""" +class StageDef(BaseModel): + """Per-stage overrides for a staged agent. + + Each field, when set, overrides the corresponding field on the base AgentDef. + When None/omitted, the base agent's value is inherited. + """ + + prompt: str | None = None + """Override prompt template for this stage.""" + + input: list[str] | None = None + """Override input dependencies for this stage.""" + + output: dict[str, OutputField] | None = None + """Override output schema for this stage.""" + + routes: list[RouteDef] | None = None + """Override routes for this stage (replaces base routes entirely).""" + + description: str | None = None + """Override description for this stage.""" + + class ContextConfig(BaseModel): """Configuration for context accumulation behavior.""" @@ -407,6 +430,15 @@ class AgentDef(BaseModel): routes: list[RouteDef] = Field(default_factory=list) """Routing rules evaluated in order after execution.""" + stages: dict[str, StageDef] | None = None + """Named stages for multi-invocation workflows. + + Allows the same agent to appear at multiple workflow stages with + different prompts, inputs, and outputs while sharing model, tools, + and system prompt. Stages are expanded into synthetic agents at + config load time. + """ + options: list[GateOption] | None = None """Options for human_gate type agents.""" @@ -485,6 +517,21 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_session_seconds'") if self.max_agent_iterations is not None: raise ValueError("script agents cannot have 'max_agent_iterations'") + + # Validate stages + if self.stages is not None: + if self.type == "script": + raise ValueError("script agents cannot have 'stages'") + if self.type == "human_gate": + raise ValueError("human_gate agents cannot have 'stages'") + for stage_name in self.stages: + if stage_name == "default": + raise ValueError( + "Stage name 'default' is reserved. " + "The base agent definition is the implicit default stage." + ) + if not stage_name.isidentifier(): + raise ValueError(f"Stage name '{stage_name}' is not a valid Python identifier") return self @@ -674,8 +721,17 @@ def validate_references(self) -> WorkflowConfig: parallel_names = {p.name for p in self.parallel} for_each_names = {f.name for f in self.for_each} + # Build stage-qualified names for agents with stages + stage_qualified_names: set[str] = set() + for agent in self.agents: + if agent.stages: + stage_qualified_names.add(f"{agent.name}:default") + for stage_name in agent.stages: + stage_qualified_names.add(f"{agent.name}:{stage_name}") + + all_names = agent_names | parallel_names | for_each_names | stage_qualified_names + # Validate entry_point exists - all_names = agent_names | parallel_names | for_each_names if self.workflow.entry_point not in all_names: raise ValueError( f"entry_point '{self.workflow.entry_point}' not found in " @@ -690,6 +746,16 @@ def validate_references(self) -> WorkflowConfig: f"Agent '{agent.name}' routes to unknown agent, " f"parallel group, or for-each group '{route.to}'" ) + # Validate routes inside stages + if agent.stages: + for stage_name, stage_def in agent.stages.items(): + if stage_def.routes: + for route in stage_def.routes: + if route.to != "$end" and route.to not in all_names: + raise ValueError( + f"Agent '{agent.name}' stage '{stage_name}' " + f"routes to unknown target '{route.to}'" + ) # Validate parallel group agent references exist for parallel_group in self.parallel: diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 8bd0c63..0d1ed5b 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -21,7 +21,7 @@ # Note: this is intentionally broader than the agent_ref_pattern in _validate_output_references # (which only matches {{ }} blocks with .output singular). This pattern also matches {% %} blocks # and .outputs plural for path coverage analysis. -_OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(\w+)\.outputs?\b") +_OUTPUT_REF_PATTERN = re.compile(r"(?:\{\{|\{%)[^}%]*?(?:stages\.\w+\.)?(\w+)\.outputs?\b") # DFS path cap: larger workflows may get partial coverage analysis _MAX_ENUMERATED_PATHS = 100 @@ -33,7 +33,7 @@ # All with optional ? suffix INPUT_REF_PATTERN = re.compile( r"^(?:" - r"(?P[a-zA-Z_][a-zA-Z0-9_]*)\.output(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*))?|" + r"(?P[a-zA-Z_][a-zA-Z0-9_]*)(?::(?P[a-zA-Z_][a-zA-Z0-9_]*))?\.output(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*))?|" r"(?P[a-zA-Z_][a-zA-Z0-9_]*)\.outputs\.(?P[a-zA-Z_][a-zA-Z0-9_]*)(?:\.(?P[a-zA-Z_][a-zA-Z0-9_]*))?|" r"workflow\.input\.(?P[a-zA-Z_][a-zA-Z0-9_]*)" r")(?P\?)?$" @@ -111,13 +111,18 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: parallel_errors = _validate_parallel_groups(config) errors.extend(parallel_errors) - # Validate for_each groups: reject script steps as inline agents + # Validate for_each groups: reject script steps and staged agents as inline agents for for_each_group in config.for_each: if for_each_group.agent.type == "script": errors.append( f"For-each group '{for_each_group.name}' uses a script step as its " "inline agent. Script steps cannot be used in for_each groups." ) + if for_each_group.agent.stages: + errors.append( + f"For-each group '{for_each_group.name}' inline agent has stages. " + "Stages are not supported on for-each inline agents." + ) # Validate workflow output references output_errors = _validate_output_references( @@ -204,16 +209,22 @@ def _validate_input_references( # Check if referencing another agent's output ref_agent = match.group("agent") - if ref_agent and ref_agent not in agent_names: - is_optional = match.group("optional") == "?" - if is_optional: - warnings.append( - f"Agent '{agent_name}' has optional reference to unknown agent '{ref_agent}'" - ) - else: - errors.append( - f"Agent '{agent_name}' references unknown agent '{ref_agent}' in input" - ) + ref_stage = match.group("stage") + if ref_agent: + # Build the full agent name (with optional stage qualifier) + full_agent_name = f"{ref_agent}:{ref_stage}" if ref_stage else ref_agent + if full_agent_name not in agent_names: + is_optional = match.group("optional") == "?" + if is_optional: + warnings.append( + f"Agent '{agent_name}' has optional reference to " + f"unknown agent '{full_agent_name}'" + ) + else: + errors.append( + f"Agent '{agent_name}' references unknown agent " + f"'{full_agent_name}' in input" + ) # Check if referencing parallel group output ref_parallel = match.group("parallel") @@ -301,12 +312,12 @@ def _validate_output_references( errors: list[str] = [] # Pattern to find potential agent references in templates - agent_ref_pattern = re.compile(r"\{\{\s*(\w+)\.output") + agent_ref_pattern = re.compile(r"\{\{\s*(?:stages\.)?(\w+)\.(?:\w+\.)?output") for field, template in output.items(): matches = agent_ref_pattern.findall(template) for ref in matches: - if ref not in valid_names and ref not in ("workflow", "context"): + if ref not in valid_names and ref not in ("workflow", "context", "stages"): errors.append(f"Workflow output '{field}' references unknown agent '{ref}'") return errors @@ -379,6 +390,13 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]: "Script steps cannot be used in parallel groups." ) + # Validate no staged agents in parallel groups + if agent.stages: + errors.append( + f"Agent '{agent_name}' in parallel group '{pg.name}' has stages. " + "Agents with stages cannot be used in parallel groups." + ) + # PE-6.2: Validate parallel group route targets for_each_names = {fe.name for fe in config.for_each} all_names = agent_names | parallel_names | for_each_names diff --git a/src/conductor/engine/context.py b/src/conductor/engine/context.py index 82ef4ce..9193de1 100644 --- a/src/conductor/engine/context.py +++ b/src/conductor/engine/context.py @@ -129,14 +129,45 @@ def store(self, agent_name: str, output: dict[str, Any]) -> None: This method updates the agent_outputs dictionary, appends the agent to execution_history, and increments the iteration counter. + For stage-qualified agents (name contains ``:``, e.g. ``vp:review``), + the output is stored under both the full qualified name and the base + agent name. This preserves backward-compatible ``{{ agent.output }}`` + access which always returns the latest output from any stage. + Args: agent_name: The name of the agent whose output is being stored. output: The structured output from the agent. """ self.agent_outputs[agent_name] = output + # Dual-key storage: stage-qualified agents also update the base name + # so {{ agent.output }} always returns the latest output + if ":" in agent_name: + base_name = agent_name.split(":")[0] + self.agent_outputs[base_name] = output self.execution_history.append(agent_name) self.current_iteration += 1 + def _build_stages_dict(self) -> dict[str, dict[str, dict[str, Any]]]: + """Build the nested ``stages`` dict for Jinja2 template access. + + Since colons cannot appear in Jinja2 identifiers, stage-qualified + outputs (e.g. ``vp:review``) are made accessible via a nested dict:: + + stages.vp.review.output.field + + Returns: + Dict mapping base_name → stage_name → {"output": output_dict}. + Empty dict if no stage-qualified outputs exist. + """ + stages: dict[str, dict[str, dict[str, Any]]] = {} + for key, output in self.agent_outputs.items(): + if ":" in key: + base_name, stage_name = key.split(":", 1) + if base_name not in stages: + stages[base_name] = {} + stages[base_name][stage_name] = {"output": output} + return stages + def build_for_agent( self, agent_name: str, @@ -216,6 +247,9 @@ def build_for_agent( else: ctx[last_agent] = {"output": last_output} + # Inject stages dict for Jinja2 access to stage-qualified outputs + ctx["stages"] = self._build_stages_dict() + return ctx def _add_explicit_input(self, ctx: dict[str, Any], input_ref: str) -> None: diff --git a/tests/test_config/test_expander.py b/tests/test_config/test_expander.py new file mode 100644 index 0000000..888a0f5 --- /dev/null +++ b/tests/test_config/test_expander.py @@ -0,0 +1,682 @@ +"""Tests for stage expansion feature. + +Tests cover StageDef schema validation, AgentDef stages field validation, +the expand_stages() function, and edge cases in stage expansion. +""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from conductor.config.expander import expand_stages +from conductor.config.schema import ( + AgentDef, + OutputField, + RouteDef, + StageDef, + WorkflowConfig, + WorkflowDef, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_config( + agents: list[AgentDef], + entry_point: str = "vp", +) -> WorkflowConfig: + """Build a minimal WorkflowConfig for testing.""" + return WorkflowConfig( + workflow=WorkflowDef( + name="test", + entry_point=entry_point, + ), + agents=agents, + ) + + +# --------------------------------------------------------------------------- +# 1. StageDef schema validation +# --------------------------------------------------------------------------- + + +class TestStageDef: + """Tests for StageDef Pydantic model validation.""" + + def test_stage_def_all_none(self) -> None: + """StageDef with no arguments should have all fields as None.""" + stage = StageDef() + assert stage.prompt is None + assert stage.input is None + assert stage.output is None + assert stage.routes is None + assert stage.description is None + + def test_stage_def_with_prompt(self) -> None: + """StageDef with only a prompt should set prompt and leave others None.""" + stage = StageDef(prompt="Review") + assert stage.prompt == "Review" + assert stage.input is None + assert stage.output is None + assert stage.routes is None + assert stage.description is None + + def test_stage_def_with_all_fields(self) -> None: + """StageDef with all five fields should retain every value.""" + stage = StageDef( + prompt="Do work", + input=["a.output"], + output={"result": OutputField(type="string")}, + routes=[RouteDef(to="$end")], + description="A test stage", + ) + assert stage.prompt == "Do work" + assert stage.input == ["a.output"] + assert "result" in stage.output + assert len(stage.routes) == 1 + assert stage.description == "A test stage" + + def test_stage_def_with_routes(self) -> None: + """StageDef with routes should accept a list of RouteDef.""" + stage = StageDef(routes=[RouteDef(to="$end")]) + assert stage.routes is not None + assert stage.routes[0].to == "$end" + + def test_stage_def_with_output(self) -> None: + """StageDef with output should accept a dict of OutputField.""" + stage = StageDef(output={"field": OutputField(type="string")}) + assert stage.output is not None + assert stage.output["field"].type == "string" + + +# --------------------------------------------------------------------------- +# 2. AgentDef stages field validation +# --------------------------------------------------------------------------- + + +class TestAgentDefStages: + """Tests for AgentDef stages field and its validators.""" + + def test_stages_default_none(self) -> None: + """AgentDef without stages should default to None.""" + agent = AgentDef(name="a", model="gpt-4", prompt="Hello") + assert agent.stages is None + + def test_stages_with_valid_stages(self) -> None: + """AgentDef with a valid stages dict should be accepted.""" + agent = AgentDef( + name="a", + model="gpt-4", + prompt="Hello", + stages={"review": StageDef(prompt="Review")}, + ) + assert agent.stages is not None + assert "review" in agent.stages + + def test_stages_reserved_default_name_rejected(self) -> None: + """Stage name 'default' is reserved and must be rejected.""" + with pytest.raises(ValidationError, match="default"): + AgentDef( + name="a", + model="gpt-4", + prompt="Hello", + stages={"default": StageDef()}, + ) + + def test_stages_invalid_identifier_rejected(self) -> None: + """Stage names that are not valid Python identifiers must be rejected.""" + with pytest.raises(ValidationError, match="identifier"): + AgentDef( + name="a", + model="gpt-4", + prompt="Hello", + stages={"not-valid": StageDef()}, + ) + + def test_stages_on_script_rejected(self) -> None: + """Script agents cannot have stages.""" + with pytest.raises(ValidationError): + AgentDef( + name="a", + type="script", + command="echo hi", + stages={"review": StageDef()}, + ) + + def test_stages_on_human_gate_rejected(self) -> None: + """Human gate agents cannot have stages.""" + with pytest.raises(ValidationError): + AgentDef( + name="a", + type="human_gate", + prompt="Choose", + options=["yes", "no"], + stages={"review": StageDef()}, + ) + + def test_empty_stages_dict(self) -> None: + """An empty stages dict should be accepted (valid but no-op).""" + agent = AgentDef( + name="a", + model="gpt-4", + prompt="Hello", + stages={}, + ) + assert agent.stages == {} + + +# --------------------------------------------------------------------------- +# 3. Stage expansion logic +# --------------------------------------------------------------------------- + + +class TestExpandStages: + """Tests for the expand_stages() function.""" + + def test_no_stages_returns_unchanged(self) -> None: + """Config with no staged agents should be returned unchanged.""" + config = _make_config( + agents=[ + AgentDef(name="vp", model="gpt-4", prompt="Go"), + ], + ) + original_count = len(config.agents) + result = expand_stages(config) + assert len(result.agents) == original_count + + def test_expands_single_stage(self) -> None: + """Agent with one stage should produce a default and one stage synthetic.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + system_prompt="You are a VP", + prompt="Set direction", + stages={"review": StageDef(prompt="Review output")}, + ), + ], + ) + result = expand_stages(config) + + names = [a.name for a in result.agents] + assert "vp" in names + assert "vp:default" in names + assert "vp:review" in names + + default = next(a for a in result.agents if a.name == "vp:default") + assert default.prompt == "Set direction" + assert default.stages is None + + review = next(a for a in result.agents if a.name == "vp:review") + assert review.prompt == "Review output" + assert review.stages is None + assert review.model == "gpt-4" + assert review.system_prompt == "You are a VP" + + def test_expands_multiple_stages(self) -> None: + """Agent with two stages should produce three synthetic agents.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={ + "review": StageDef(prompt="Review"), + "summary": StageDef(prompt="Summarize"), + }, + ), + ], + ) + result = expand_stages(config) + + synthetic_names = [a.name for a in result.agents if ":" in a.name] + assert len(synthetic_names) == 3 + assert "vp:default" in synthetic_names + assert "vp:review" in synthetic_names + assert "vp:summary" in synthetic_names + + def test_stage_overrides_prompt(self) -> None: + """Stage prompt should override the base agent prompt.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Base prompt", + stages={"review": StageDef(prompt="Stage prompt")}, + ), + ], + ) + result = expand_stages(config) + + review = next(a for a in result.agents if a.name == "vp:review") + assert review.prompt == "Stage prompt" + + def test_stage_overrides_input(self) -> None: + """Stage input should override the base agent input list.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + input=["workflow.input.project"], + stages={"review": StageDef(input=["ic.output"])}, + ), + ], + ) + result = expand_stages(config) + + review = next(a for a in result.agents if a.name == "vp:review") + assert review.input == ["ic.output"] + + def test_stage_overrides_output(self) -> None: + """Stage output should override the base agent output schema.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + output={"plan": OutputField(type="string")}, + stages={ + "review": StageDef( + output={"verdict": OutputField(type="boolean")}, + ), + }, + ), + ], + ) + result = expand_stages(config) + + review = next(a for a in result.agents if a.name == "vp:review") + assert "verdict" in review.output + assert "plan" not in review.output + + def test_stage_overrides_routes(self) -> None: + """Stage routes should replace the base agent routes entirely.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + routes=[RouteDef(to="$end")], + stages={ + "review": StageDef( + routes=[RouteDef(to="$end", when="verdict == true")], + ), + }, + ), + ], + ) + result = expand_stages(config) + + review = next(a for a in result.agents if a.name == "vp:review") + assert len(review.routes) == 1 + assert review.routes[0].when == "verdict == true" + + def test_stage_overrides_description(self) -> None: + """Stage description should override the base agent description.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + description="Base description", + stages={ + "review": StageDef(description="Review description"), + }, + ), + ], + ) + result = expand_stages(config) + + review = next(a for a in result.agents if a.name == "vp:review") + assert review.description == "Review description" + + def test_stage_inherits_unoverridden_fields(self) -> None: + """Stage with only prompt should inherit model, system_prompt, tools from base.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + system_prompt="You are a VP", + prompt="Go", + tools=["read_file"], + stages={"review": StageDef(prompt="Review")}, + ), + ], + ) + result = expand_stages(config) + + review = next(a for a in result.agents if a.name == "vp:review") + assert review.model == "gpt-4" + assert review.system_prompt == "You are a VP" + assert review.tools == ["read_file"] + + def test_entry_point_rewritten(self) -> None: + """Entry point referencing a staged agent should be rewritten to default.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + ], + entry_point="vp", + ) + result = expand_stages(config) + assert result.workflow.entry_point == "vp:default" + + def test_entry_point_not_rewritten_for_unstaged(self) -> None: + """Entry point referencing an unstaged agent should remain unchanged.""" + config = _make_config( + agents=[ + AgentDef(name="ic", model="gpt-4", prompt="Implement"), + ], + entry_point="ic", + ) + result = expand_stages(config) + assert result.workflow.entry_point == "ic" + + def test_bare_route_targets_rewritten(self) -> None: + """Bare routes targeting a staged agent should be rewritten to default.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="ic"), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + AgentDef( + name="ic", + model="gpt-4", + prompt="Implement", + routes=[RouteDef(to="vp")], + ), + ], + ) + result = expand_stages(config) + + ic = next(a for a in result.agents if a.name == "ic") + assert ic.routes[0].to == "vp:default" + + def test_stage_qualified_route_not_rewritten(self) -> None: + """Already-qualified routes like 'vp:review' should not be rewritten.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="ic"), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + AgentDef( + name="ic", + model="gpt-4", + prompt="Implement", + routes=[RouteDef(to="vp:review")], + ), + ], + ) + result = expand_stages(config) + + ic = next(a for a in result.agents if a.name == "ic") + assert ic.routes[0].to == "vp:review" + + def test_original_agent_preserved(self) -> None: + """The original staged agent should remain in config.agents.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + ], + ) + result = expand_stages(config) + + original = next(a for a in result.agents if a.name == "vp") + assert original.stages is not None + assert "review" in original.stages + + def test_synthetic_agents_have_no_stages(self) -> None: + """All synthetic agents produced by expansion should have stages=None.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={ + "review": StageDef(prompt="Review"), + "summary": StageDef(prompt="Summarize"), + }, + ), + ], + ) + result = expand_stages(config) + + for agent in result.agents: + if ":" in agent.name: + assert agent.stages is None, f"{agent.name} should have stages=None" + + +# --------------------------------------------------------------------------- +# 4. Edge cases +# --------------------------------------------------------------------------- + + +class TestExpandStagesEdgeCases: + """Edge-case tests for stage expansion.""" + + def test_empty_stages_dict_no_expansion(self) -> None: + """Agent with an empty stages dict should produce no synthetic agents.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={}, + ), + ], + ) + original_count = len(config.agents) + result = expand_stages(config) + + synthetic = [a for a in result.agents if ":" in a.name] + assert len(synthetic) == 0 + assert len(result.agents) == original_count + + def test_multiple_staged_agents(self) -> None: + """Two agents each with stages should both be fully expanded.""" + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="vp"), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Direct", + stages={"review": StageDef(prompt="VP review")}, + ), + AgentDef( + name="ic", + model="gpt-4", + prompt="Implement", + routes=[RouteDef(to="vp:review")], + stages={"test": StageDef(prompt="Run tests")}, + ), + ], + ) + result = expand_stages(config) + + names = [a.name for a in result.agents] + # Original agents + assert "vp" in names + assert "ic" in names + # VP synthetics + assert "vp:default" in names + assert "vp:review" in names + # IC synthetics + assert "ic:default" in names + assert "ic:test" in names + + +# --------------------------------------------------------------------------- +# 5. Name collision validation +# --------------------------------------------------------------------------- + + +class TestNameCollisionValidation: + """Tests for name collision detection in expand_stages().""" + + def test_stage_name_collides_with_existing_agent(self) -> None: + """Expansion should raise if a synthetic name matches an existing agent.""" + from conductor.exceptions import ConfigurationError + + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="vp"), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + AgentDef( + name="vp:review", + model="gpt-4", + prompt="Colliding agent", + ), + ], + ) + with pytest.raises(ConfigurationError, match="Name collision"): + expand_stages(config) + + def test_default_name_collides_with_existing_agent(self) -> None: + """Expansion should raise if agent:default matches an existing agent.""" + from conductor.exceptions import ConfigurationError + + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="vp"), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + AgentDef( + name="vp:default", + model="gpt-4", + prompt="Colliding default agent", + ), + ], + ) + with pytest.raises(ConfigurationError, match="Name collision"): + expand_stages(config) + + def test_no_collision_when_names_are_unique(self) -> None: + """Expansion should succeed when no name collisions exist.""" + config = _make_config( + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Go", + stages={"review": StageDef(prompt="Review")}, + ), + AgentDef( + name="ic", + model="gpt-4", + prompt="Implement", + ), + ], + ) + result = expand_stages(config) + names = [a.name for a in result.agents] + assert "vp:default" in names + assert "vp:review" in names + + +# --------------------------------------------------------------------------- +# 6. For-each inline agent stages validation +# --------------------------------------------------------------------------- + + +class TestForEachStagesValidation: + """Tests for rejecting stages on for-each inline agents.""" + + def test_for_each_inline_agent_with_stages_rejected(self) -> None: + """Validator should reject stages on for-each inline agents.""" + from conductor.config.schema import ForEachDef + from conductor.config.validator import validate_workflow_config + from conductor.exceptions import ConfigurationError + + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="processor"), + agents=[ + AgentDef(name="starter", model="gpt-4", prompt="Start"), + ], + for_each=[ + ForEachDef( + name="processor", + type="for_each", + source="starter.output.items", + **{"as": "item"}, + agent=AgentDef( + name="worker", + model="gpt-4", + prompt="Process {{ item }}", + stages={"review": StageDef(prompt="Review {{ item }}")}, + ), + routes=[RouteDef(to="$end")], + ), + ], + ) + with pytest.raises(ConfigurationError, match="Stages are not supported"): + validate_workflow_config(config) + + def test_for_each_inline_agent_without_stages_ok(self) -> None: + """For-each inline agent without stages should pass validation.""" + from conductor.config.schema import ForEachDef + from conductor.config.validator import validate_workflow_config + + config = WorkflowConfig( + workflow=WorkflowDef(name="test", entry_point="processor"), + agents=[ + AgentDef(name="starter", model="gpt-4", prompt="Start"), + ], + for_each=[ + ForEachDef( + name="processor", + type="for_each", + source="starter.output.items", + **{"as": "item"}, + agent=AgentDef( + name="worker", + model="gpt-4", + prompt="Process {{ item }}", + ), + routes=[RouteDef(to="$end")], + ), + ], + ) + # Should not raise + validate_workflow_config(config) diff --git a/tests/test_engine/test_staged_workflow.py b/tests/test_engine/test_staged_workflow.py new file mode 100644 index 0000000..455a321 --- /dev/null +++ b/tests/test_engine/test_staged_workflow.py @@ -0,0 +1,346 @@ +"""Tests for staged re-invocation feature in the workflow engine. + +Verifies end-to-end staged workflow execution, context dual-key storage, +the ``stages`` dict injection for Jinja2 templates, and backward compatibility +with classic loop-back patterns. +""" + +from __future__ import annotations + +import pytest + +from conductor.config.expander import expand_stages +from conductor.config.schema import ( + AgentDef, + ContextConfig, + LimitsConfig, + OutputField, + RouteDef, + RuntimeConfig, + StageDef, + WorkflowConfig, + WorkflowDef, +) +from conductor.engine.context import WorkflowContext +from conductor.engine.workflow import WorkflowEngine +from conductor.providers.copilot import CopilotProvider + + +class TestStagedWorkflowExecution: + """Integration tests for staged agent execution through the workflow engine.""" + + @pytest.mark.asyncio + async def test_vp_ic_vp_review_workflow(self) -> None: + """Test VP → IC → VP:review end-to-end staged workflow. + + Constructs a three-step workflow where the VP agent plans, IC implements, + and VP is re-invoked via its ``review`` stage to approve. Verifies that + both VP outputs are independently accessible and the final output is + correctly rendered from stage-qualified template references. + """ + config = WorkflowConfig( + workflow=WorkflowDef( + name="staged-test", + entry_point="vp", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Plan: {{ workflow.input.project }}", + output={"plan": OutputField(type="string")}, + routes=[RouteDef(to="ic")], + stages={ + "review": StageDef( + prompt="Review: {{ ic.output.impl }}", + input=["ic.output"], + output={ + "approved": OutputField(type="boolean"), + "feedback": OutputField(type="string"), + }, + routes=[RouteDef(to="$end")], + ), + }, + ), + AgentDef( + name="ic", + model="gpt-4", + prompt="Implement: {{ vp.output.plan }}", + output={"impl": OutputField(type="string")}, + routes=[RouteDef(to="vp:review")], + ), + ], + output={ + "plan": "{{ stages.vp.default.output.plan }}", + "approved": "{{ stages.vp.review.output.approved }}", + }, + ) + config = expand_stages(config) + + responses = { + "vp:default": {"plan": "Build microservice"}, + "ic": {"impl": "Built auth module"}, + "vp:review": {"approved": True, "feedback": "Looks good"}, + } + + def mock_handler(agent, prompt, context): + return responses[agent.name] + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider) + + result = await engine.run({"project": "auth-service"}) + + assert result["plan"] == "Build microservice" + assert result["approved"] == "True" + + summary = engine.get_execution_summary() + assert summary["agents_executed"] == ["vp:default", "ic", "vp:review"] + + @pytest.mark.asyncio + async def test_staged_workflow_with_loop_back(self) -> None: + """Test VP → IC → VP:review → IC (loop) → VP:review → $end. + + The VP:review stage can loop back to IC for revisions when it does not + approve. On the first review the mock rejects; on the second it + approves. Verifies the loop-back pattern executes correctly and the + final result reflects approval. + """ + config = WorkflowConfig( + workflow=WorkflowDef( + name="staged-loop-test", + entry_point="vp", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=20), + ), + agents=[ + AgentDef( + name="vp", + model="gpt-4", + prompt="Plan: {{ workflow.input.project }}", + output={"plan": OutputField(type="string")}, + routes=[RouteDef(to="ic")], + stages={ + "review": StageDef( + prompt="Review: {{ ic.output.impl }}", + input=["ic.output"], + output={ + "approved": OutputField(type="boolean"), + "feedback": OutputField(type="string"), + }, + routes=[ + RouteDef(to="$end", when="{{ output.approved }}"), + RouteDef(to="ic"), + ], + ), + }, + ), + AgentDef( + name="ic", + model="gpt-4", + # Use stage-qualified reference because the vp base key + # gets overwritten by the review stage via dual-key storage. + prompt="Implement: {{ stages.vp.default.output.plan }}", + output={"impl": OutputField(type="string")}, + routes=[RouteDef(to="vp:review")], + ), + ], + output={ + "approved": "{{ stages.vp.review.output.approved }}", + "feedback": "{{ stages.vp.review.output.feedback }}", + }, + ) + config = expand_stages(config) + + review_count = 0 + + def mock_handler(agent, prompt, context): + nonlocal review_count + if agent.name == "vp:default": + return {"plan": "Build microservice"} + if agent.name == "ic": + revision = review_count + 1 + return {"impl": f"Implementation v{revision}"} + if agent.name == "vp:review": + review_count += 1 + if review_count == 1: + return {"approved": False, "feedback": "Needs revision"} + return {"approved": True, "feedback": "Approved"} + raise ValueError(f"Unexpected agent: {agent.name}") + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider) + + result = await engine.run({"project": "auth-service"}) + + assert result["approved"] == "True" + assert result["feedback"] == "Approved" + + summary = engine.get_execution_summary() + assert summary["agents_executed"] == [ + "vp:default", + "ic", + "vp:review", + "ic", + "vp:review", + ] + + @pytest.mark.asyncio + async def test_backward_compat_loop_back_without_stages(self) -> None: + """Test classic drafter → reviewer → drafter loop without stages. + + Ensures that the traditional loop-back pattern (no stages) continues to + work correctly after the staged-agent feature was introduced. The + reviewer rejects on the first pass and approves on the second. + """ + config = WorkflowConfig( + workflow=WorkflowDef( + name="loop-test", + entry_point="drafter", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="drafter", + model="gpt-4", + prompt="Draft", + output={"draft": OutputField(type="string")}, + routes=[RouteDef(to="reviewer")], + ), + AgentDef( + name="reviewer", + model="gpt-4", + prompt="Review: {{ drafter.output.draft }}", + output={"approved": OutputField(type="boolean")}, + routes=[ + RouteDef(to="$end", when="{{ output.approved }}"), + RouteDef(to="drafter"), + ], + ), + ], + output={"final": "{{ drafter.output.draft }}"}, + ) + + draft_count = 0 + + def mock_handler(agent, prompt, context): + nonlocal draft_count + if agent.name == "drafter": + draft_count += 1 + return {"draft": f"Draft v{draft_count}"} + if agent.name == "reviewer": + approved = draft_count >= 2 + return {"approved": approved} + raise ValueError(f"Unexpected agent: {agent.name}") + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider) + + result = await engine.run({}) + + assert result["final"] == "Draft v2" + + summary = engine.get_execution_summary() + assert summary["agents_executed"] == [ + "drafter", + "reviewer", + "drafter", + "reviewer", + ] + + +class TestStagedContextStorage: + """Tests for WorkflowContext dual-key storage and stages dict injection.""" + + def test_dual_key_storage(self) -> None: + """Test that stage-qualified names produce dual-key entries. + + Storing output under ``vp:default`` should create entries for both + ``vp:default`` and the base name ``vp``. A subsequent store under + ``vp:review`` should overwrite the ``vp`` base key while preserving + the original ``vp:default`` entry. + """ + ctx = WorkflowContext() + ctx.store("vp:default", {"plan": "Build it"}) + + # Both keys exist + assert "vp:default" in ctx.agent_outputs + assert "vp" in ctx.agent_outputs + assert ctx.agent_outputs["vp:default"]["plan"] == "Build it" + assert ctx.agent_outputs["vp"]["plan"] == "Build it" + + # Store review stage — base name overwrites + ctx.store("vp:review", {"approved": True}) + assert ctx.agent_outputs["vp:review"]["approved"] is True + assert ctx.agent_outputs["vp"]["approved"] is True # overwritten + + # Stage-specific keys preserved + assert ctx.agent_outputs["vp:default"]["plan"] == "Build it" + + def test_stages_dict_in_context(self) -> None: + """Test that ``_build_stages_dict()`` produces correct nested structure. + + After storing outputs for ``vp:default``, ``ic``, and ``vp:review``, + the stages dict should contain nested entries for the ``vp`` base name + only (``ic`` is not stage-qualified). + """ + ctx = WorkflowContext() + ctx.store("vp:default", {"plan": "Build it"}) + ctx.store("ic", {"impl": "Done"}) + ctx.store("vp:review", {"approved": True}) + + agent_ctx = ctx.build_for_agent("output", [], mode="accumulate") + + assert "stages" in agent_ctx + assert "vp" in agent_ctx["stages"] + assert "default" in agent_ctx["stages"]["vp"] + assert "review" in agent_ctx["stages"]["vp"] + assert agent_ctx["stages"]["vp"]["default"]["output"]["plan"] == "Build it" + assert agent_ctx["stages"]["vp"]["review"]["output"]["approved"] is True + + # Non-staged agents do not appear in stages dict + assert "ic" not in agent_ctx["stages"] + + def test_stages_dict_empty_when_no_stages(self) -> None: + """Test that stages dict is empty when no colon-qualified outputs exist.""" + ctx = WorkflowContext() + ctx.store("agent1", {"data": "value"}) + + agent_ctx = ctx.build_for_agent("agent2", [], mode="accumulate") + + assert agent_ctx["stages"] == {} + + def test_non_staged_agent_store_unchanged(self) -> None: + """Test that storing a regular (non-staged) agent creates exactly one key.""" + ctx = WorkflowContext() + ctx.store("agent1", {"x": 1}) + + assert "agent1" in ctx.agent_outputs + # No extra base-name key created for a non-colon-qualified name + assert len(ctx.agent_outputs) == 1 + + def test_explicit_mode_with_stage_qualified_input(self) -> None: + """Test explicit context mode with stage-qualified input references. + + When an agent declares ``vp:default.output`` as an explicit input, the + context should include the ``vp:default`` entry and the ``stages`` dict + should still be injected. + """ + ctx = WorkflowContext() + ctx.store("vp:default", {"plan": "Build it"}) + ctx.store("vp:review", {"approved": True}) + + agent_ctx = ctx.build_for_agent("ic", ["vp:default.output"], mode="explicit") + + # The stages dict is always injected + assert "stages" in agent_ctx + + # The vp:default output is accessible via the colon-qualified key + assert "vp:default" in agent_ctx + assert agent_ctx["vp:default"]["output"]["plan"] == "Build it" diff --git a/tests/test_web/take_screenshots.py b/tests/test_web/take_screenshots.py new file mode 100644 index 0000000..d9c58e6 --- /dev/null +++ b/tests/test_web/take_screenshots.py @@ -0,0 +1,454 @@ +"""Playwright screenshot test for the Conductor web dashboard. + +Starts a mock FastAPI server pre-loaded with staged-workflow events, +then uses Playwright to screenshot the dashboard in various states. + +Usage: + uv run python tests/test_web/take_screenshots.py +""" + +from __future__ import annotations + +import asyncio +import time +from pathlib import Path + +import uvicorn +from fastapi import FastAPI +from fastapi.responses import FileResponse, JSONResponse +from fastapi.staticfiles import StaticFiles + +# Resolve paths +STATIC_DIR = Path(__file__).resolve().parents[2] / "src" / "conductor" / "web" / "static" +SCREENSHOTS_DIR = Path(__file__).resolve().parents[2] / "screenshots-for-pr" + + +def _build_staged_workflow_events() -> list[dict]: + """Build a realistic set of events for a VP→IC→VP:review staged workflow.""" + t = time.time() - 60 # Start 60s ago + + return [ + { + "type": "workflow_started", + "timestamp": t, + "data": { + "workflow_name": "staged-review", + "entry_point": "vp:default", + "agents": [ + { + "name": "vp", + "type": "agent", + "model": "gpt-4.1", + "routes": [{"to": "ic"}], + "stages": {"review": {}}, + }, + { + "name": "vp:default", + "type": "agent", + "model": "gpt-4.1", + "routes": [{"to": "ic"}], + }, + { + "name": "vp:review", + "type": "agent", + "model": "gpt-4.1", + "routes": [ + {"to": "ic", "when": "verdict == 'revise'"}, + {"to": "$end", "when": "verdict == 'approve'"}, + ], + }, + { + "name": "ic", + "type": "agent", + "model": "gpt-4.1", + "routes": [{"to": "vp:review"}], + }, + ], + "parallel": [], + "for_each": [], + }, + }, + # VP:default starts + { + "type": "agent_started", + "timestamp": t + 1, + "data": {"agent": "vp:default"}, + }, + { + "type": "agent_turn_start", + "timestamp": t + 1.5, + "data": {"agent": "vp:default", "turn": "awaiting_model"}, + }, + { + "type": "agent_message", + "timestamp": t + 5, + "data": { + "agent": "vp:default", + "content": "Based on the project requirements, I'm setting the technical " + "direction for our team. We'll use a microservices architecture with " + "Python FastAPI for the backend and React for the frontend. The key " + "priorities are: 1) API design first, 2) Comprehensive test coverage, " + "3) CI/CD pipeline setup.", + }, + }, + { + "type": "agent_completed", + "timestamp": t + 8, + "data": { + "agent": "vp:default", + "output": { + "direction": "microservices with FastAPI + React", + "priorities": ["API design", "test coverage", "CI/CD"], + }, + "model": "gpt-4.1", + "tokens": 245, + "route": "ic", + }, + }, + # IC starts + { + "type": "agent_started", + "timestamp": t + 10, + "data": {"agent": "ic"}, + }, + { + "type": "agent_turn_start", + "timestamp": t + 10.5, + "data": {"agent": "ic", "turn": "awaiting_model"}, + }, + { + "type": "agent_tool_start", + "timestamp": t + 12, + "data": { + "agent": "ic", + "tool": "create_file", + "input": {"path": "src/api/main.py"}, + }, + }, + { + "type": "agent_tool_complete", + "timestamp": t + 13, + "data": { + "agent": "ic", + "tool": "create_file", + "output": "File created successfully", + }, + }, + { + "type": "agent_message", + "timestamp": t + 18, + "data": { + "agent": "ic", + "content": "I've implemented the initial API structure following the VP's " + "direction. Created the FastAPI application with three main endpoints: " + "/users, /projects, and /tasks. Added comprehensive Pydantic models " + "for request/response validation and pytest fixtures for testing.", + }, + }, + { + "type": "agent_completed", + "timestamp": t + 20, + "data": { + "agent": "ic", + "output": { + "files_created": [ + "src/api/main.py", + "src/api/models.py", + "tests/test_api.py", + ], + "implementation_summary": "FastAPI app with 3 endpoints, Pydantic models, tests", + }, + "model": "gpt-4.1", + "tokens": 512, + "route": "vp:review", + }, + }, + # VP:review starts (same agent, review stage) + { + "type": "agent_started", + "timestamp": t + 22, + "data": {"agent": "vp:review"}, + }, + { + "type": "agent_turn_start", + "timestamp": t + 22.5, + "data": {"agent": "vp:review", "turn": "awaiting_model"}, + }, + { + "type": "agent_message", + "timestamp": t + 28, + "data": { + "agent": "vp:review", + "content": "Reviewing the IC's implementation against my original direction. " + "The API structure looks solid — good use of Pydantic models and the " + "endpoint design follows REST conventions. Test coverage is present. " + "Verdict: APPROVE. The implementation meets the technical requirements.", + }, + }, + { + "type": "agent_completed", + "timestamp": t + 30, + "data": { + "agent": "vp:review", + "output": { + "verdict": "approve", + "review_notes": "Solid implementation, meets requirements", + }, + "model": "gpt-4.1", + "tokens": 189, + "route": "$end", + }, + }, + # Workflow completes + { + "type": "workflow_completed", + "timestamp": t + 32, + "data": { + "output": { + "final_verdict": "approve", + "review_notes": "Solid implementation, meets requirements", + }, + "total_tokens": 946, + }, + }, + ] + + +def _build_in_progress_events() -> list[dict]: + """Build events showing the workflow mid-execution (IC running).""" + full = _build_staged_workflow_events() + # Return events up to IC running (before IC completes) + return full[:9] # Up through agent_tool_complete for IC + + +def create_mock_app(events: list[dict]) -> FastAPI: + """Create a FastAPI app that serves the dashboard with pre-loaded events.""" + app = FastAPI() + + @app.get("/") + async def index(): + return FileResponse(STATIC_DIR / "index.html", media_type="text/html") + + @app.get("/favicon.svg") + async def favicon(): + return FileResponse(STATIC_DIR / "favicon.svg", media_type="image/svg+xml") + + @app.get("/api/state") + async def get_state(): + return JSONResponse(content=events) + + @app.get("/api/logs") + async def get_logs(): + return JSONResponse(content=events) + + app.mount("/assets", StaticFiles(directory=str(STATIC_DIR / "assets")), name="assets") + + return app + + +async def take_screenshots() -> None: + """Start mock servers and take screenshots with Playwright.""" + from playwright.async_api import async_playwright + + SCREENSHOTS_DIR.mkdir(parents=True, exist_ok=True) + + screenshots_taken: list[str] = [] + assertions_passed: list[str] = [] + warnings: list[str] = [] + + # --- Server 1: Completed staged workflow --- + completed_app = create_mock_app(_build_staged_workflow_events()) + config1 = uvicorn.Config(app=completed_app, host="127.0.0.1", port=8901, log_level="warning") + server1 = uvicorn.Server(config1) + task1 = asyncio.create_task(server1.serve()) + while not server1.started: + await asyncio.sleep(0.05) + + async with async_playwright() as p: + browser = await p.chromium.launch() + + # ---- Test 1: Dashboard loads and renders graph (desktop) ---- + page = await browser.new_page(viewport={"width": 1400, "height": 900}) + await page.goto("http://127.0.0.1:8901", wait_until="networkidle") + await page.wait_for_timeout(2500) + + # Assert: dashboard title/header is visible + header = page.locator("header").first + if await header.is_visible(): + assertions_passed.append("Header is visible") + else: + warnings.append("Header not found") + + # Assert: workflow name displayed + page_text = await page.text_content("body") + if "staged-review" in (page_text or "").lower(): + assertions_passed.append("Workflow name 'staged-review' displayed") + + # Assert: stage-qualified agent nodes are rendered + for node_name in ["vp:default", "vp:review", "ic"]: + node = page.locator(f'text="{node_name}"').first + if await node.is_visible(timeout=3000): + assertions_passed.append(f"Node '{node_name}' rendered in graph") + else: + warnings.append(f"Node '{node_name}' not visible in graph") + + # Screenshot: full completed workflow overview + path = str(SCREENSHOTS_DIR / "staged-workflow-completed.png") + await page.screenshot(path=path, full_page=False) + screenshots_taken.append(path) + print(f" ✅ {path}") + + # ---- Test 2: Click each stage node and verify detail panel ---- + for node_name in ["vp:default", "vp:review", "ic"]: + try: + node = page.locator(f'text="{node_name}"').first + if await node.is_visible(timeout=2000): + await node.click() + await page.wait_for_timeout(800) + + safe_name = node_name.replace(":", "-") + path = str(SCREENSHOTS_DIR / f"staged-node-{safe_name}.png") + await page.screenshot(path=path, full_page=False) + screenshots_taken.append(path) + print(f" ✅ {path}") + assertions_passed.append(f"Detail panel opened for '{node_name}'") + except Exception as e: + warnings.append(f"Could not interact with node '{node_name}': {e}") + + # ---- Test 3: Verify completed status indicators ---- + body_text = await page.text_content("body") or "" + if "completed" in body_text.lower() or "✓" in body_text or "complete" in body_text.lower(): + assertions_passed.append("Completed status indicator found") + else: + warnings.append("No completed status indicator found") + + # Assert: token count displayed + if "946" in body_text or "tok" in body_text.lower(): + assertions_passed.append("Token count displayed") + + # ---- Test 4: API state endpoint returns correct data ---- + response = await page.evaluate( + """async () => { + const resp = await fetch('/api/state'); + const data = await resp.json(); + return { count: data.length, firstType: data[0]?.type, lastType: data[data.length-1]?.type }; + }""" + ) + if response["count"] > 0: + assertions_passed.append(f"API /api/state returns {response['count']} events") + if response["firstType"] == "workflow_started": + assertions_passed.append("First event is workflow_started") + if response["lastType"] == "workflow_completed": + assertions_passed.append("Last event is workflow_completed") + + # ---- Test 5: Verify stage-qualified agents in API response ---- + agents_data = await page.evaluate( + """async () => { + const resp = await fetch('/api/state'); + const data = await resp.json(); + const started = data[0]?.data?.agents || []; + return started.map(a => a.name); + }""" + ) + for expected in ["vp", "vp:default", "vp:review", "ic"]: + if expected in agents_data: + assertions_passed.append(f"Agent '{expected}' in API state") + else: + warnings.append(f"Agent '{expected}' missing from API state") + + await page.close() + + # ---- Test 6: Mobile viewport ---- + mobile_page = await browser.new_page(viewport={"width": 375, "height": 812}) + await mobile_page.goto("http://127.0.0.1:8901", wait_until="networkidle") + await mobile_page.wait_for_timeout(2000) + + path = str(SCREENSHOTS_DIR / "staged-workflow-mobile.png") + await mobile_page.screenshot(path=path, full_page=False) + screenshots_taken.append(path) + print(f" ✅ {path}") + assertions_passed.append("Mobile viewport renders without crash") + + await mobile_page.close() + + # ---- Test 7: Dark mode (prefers-color-scheme) ---- + dark_page = await browser.new_page( + viewport={"width": 1400, "height": 900}, + color_scheme="dark", + ) + await dark_page.goto("http://127.0.0.1:8901", wait_until="networkidle") + await dark_page.wait_for_timeout(2000) + + path = str(SCREENSHOTS_DIR / "staged-workflow-dark-mode.png") + await dark_page.screenshot(path=path, full_page=False) + screenshots_taken.append(path) + print(f" ✅ {path}") + assertions_passed.append("Dark mode renders without crash") + + await dark_page.close() + await browser.close() + + server1.should_exit = True + await task1 + + # --- Server 2: In-progress staged workflow --- + progress_app = create_mock_app(_build_in_progress_events()) + config2 = uvicorn.Config(app=progress_app, host="127.0.0.1", port=8902, log_level="warning") + server2 = uvicorn.Server(config2) + task2 = asyncio.create_task(server2.serve()) + while not server2.started: + await asyncio.sleep(0.05) + + async with async_playwright() as p: + browser = await p.chromium.launch() + page = await browser.new_page(viewport={"width": 1400, "height": 900}) + + await page.goto("http://127.0.0.1:8902", wait_until="networkidle") + await page.wait_for_timeout(2000) + + # Screenshot: in-progress state + path = str(SCREENSHOTS_DIR / "staged-workflow-in-progress.png") + await page.screenshot(path=path, full_page=False) + screenshots_taken.append(path) + print(f" ✅ {path}") + + # Assert: running state visible + body_text = await page.text_content("body") or "" + if "running" in body_text.lower() or "progress" in body_text.lower(): + assertions_passed.append("Running status indicator shown in-progress view") + + # Click on IC node (currently running) + try: + ic_node = page.locator('text="ic"').first + if await ic_node.is_visible(timeout=2000): + await ic_node.click() + await page.wait_for_timeout(800) + path = str(SCREENSHOTS_DIR / "staged-in-progress-ic-detail.png") + await page.screenshot(path=path, full_page=False) + screenshots_taken.append(path) + print(f" ✅ {path}") + assertions_passed.append("IC node detail panel opened in-progress view") + except Exception as e: + warnings.append(f"Could not click IC node in-progress: {e}") + + await browser.close() + + server2.should_exit = True + await task2 + + # --- Summary --- + print(f"\n{'='*60}") + print(f"📸 Screenshots: {len(screenshots_taken)} captured") + print(f"✅ Assertions: {len(assertions_passed)} passed") + if warnings: + print(f"⚠️ Warnings: {len(warnings)}") + for w in warnings: + print(f" - {w}") + print(f"{'='*60}") + for a in assertions_passed: + print(f" ✅ {a}") + print(f"\nAll screenshots saved to {SCREENSHOTS_DIR}/") + + +if __name__ == "__main__": + asyncio.run(take_screenshots())