diff --git a/.claude/skills/conductor/references/authoring.md b/.claude/skills/conductor/references/authoring.md index 0fa7f9a..533b50b 100644 --- a/.claude/skills/conductor/references/authoring.md +++ b/.claude/skills/conductor/references/authoring.md @@ -48,7 +48,7 @@ workflow: ```yaml agents: - name: my_agent # Required: unique identifier - type: agent # agent (default), human_gate, or script + type: agent # agent (default), human_gate, script, or workflow description: What it does model: gpt-5.2 # Override workflow default provider: claude # Optional: per-agent provider override diff --git a/.claude/skills/conductor/references/yaml-schema.md b/.claude/skills/conductor/references/yaml-schema.md index b60b5d8..2b402c4 100644 --- a/.claude/skills/conductor/references/yaml-schema.md +++ b/.claude/skills/conductor/references/yaml-schema.md @@ -90,7 +90,7 @@ agents: name: string # Unique agent identifier # Optional fields - type: string # "agent" (default), "human_gate", or "script" + type: string # "agent" (default), "human_gate", "script", or "workflow" description: string # What this agent does model: string # Override default_model provider: string # Per-agent provider override ("copilot" or "claude") diff --git a/docs/workflow-syntax.md b/docs/workflow-syntax.md index c65d405..701892d 100644 --- a/docs/workflow-syntax.md +++ b/docs/workflow-syntax.md @@ -50,7 +50,7 @@ Agents are defined in the `agents` list. Each agent represents a unit of work. agents: - name: string # Required: Unique agent identifier description: string # Optional: Purpose description - type: agent # agent | human_gate | script (default: agent) + type: agent # agent | human_gate | script | workflow (default: agent) model: string # Optional: Model identifier (e.g., 'claude-sonnet-4.5') prompt: | # Required for type=agent: Agent instructions @@ -160,6 +160,42 @@ routes: **Environment variable note** — values in `env` are passed as-is to the subprocess (they are not rendered as Jinja2 templates). Use `${VAR}` syntax in the workflow YAML loader if you need environment variable substitution in env values. +### Sub-Workflow Steps + +Sub-workflow steps reference external workflow YAML files, enabling composable and reusable workflow building blocks. The sub-workflow runs as a black box — its internal agents are not visible to the parent. + +```yaml +agents: + - name: deep_research + type: workflow + workflow: ./research-pipeline.yaml # Required: path to sub-workflow YAML + input: # Optional: explicit input declarations + - workflow.input.topic + output: # Optional: output schema for validation + findings: + type: string + routes: + - to: synthesizer +``` + +**Key semantics:** + +- The `workflow` path is resolved relative to the parent workflow file +- Sub-workflow inherits the parent's provider configuration +- Sub-workflow output is stored in context and accessible via `{{ agent_name.output.field }}` +- Recursive composition is supported (sub-workflows can reference other sub-workflows) with a depth limit of 10 +- Circular references (a workflow referencing itself) are detected and rejected + +**Access sub-workflow output in downstream agents:** + +```yaml +prompt: | + The research findings were: + {{ deep_research.output.findings }} +``` + +**Restrictions** — workflow steps cannot have `prompt`, `model`, `provider`, `tools`, `system_prompt`, `command`, or `options`. Workflow steps also cannot be used inside `parallel` groups or `for_each` groups. + ## Parallel Groups Parallel groups execute multiple agents concurrently for improved performance. diff --git a/src/conductor/config/schema.py b/src/conductor/config/schema.py index 1f1ad13..c2ffa90 100644 --- a/src/conductor/config/schema.py +++ b/src/conductor/config/schema.py @@ -357,7 +357,7 @@ class AgentDef(BaseModel): description: str | None = None """Human-readable description of agent's purpose.""" - type: Literal["agent", "human_gate", "script"] | None = None + type: Literal["agent", "human_gate", "script", "workflow"] | None = None """Agent type. Defaults to 'agent' if not specified.""" provider: Literal["copilot", "claude"] | None = None @@ -425,6 +425,17 @@ class AgentDef(BaseModel): timeout: int | None = None """Per-script timeout in seconds.""" + workflow: str | None = None + """Path to sub-workflow YAML file (required for type='workflow'). + + The path is resolved relative to the parent workflow file. + Sub-workflows run as black boxes — their internal agents are not + visible to the parent workflow. + + Example: + workflow: ./research-pipeline.yaml + """ + max_session_seconds: float | None = Field(None, ge=1.0) """Maximum wall-clock duration for this agent's session in seconds. @@ -485,6 +496,27 @@ def validate_agent_type(self) -> AgentDef: raise ValueError("script agents cannot have 'max_session_seconds'") if self.max_agent_iterations is not None: raise ValueError("script agents cannot have 'max_agent_iterations'") + elif self.type == "workflow": + if not self.workflow: + raise ValueError("workflow agents require 'workflow' path") + if self.prompt: + raise ValueError("workflow agents cannot have 'prompt'") + if self.provider: + raise ValueError("workflow agents cannot have 'provider'") + if self.model: + raise ValueError("workflow agents cannot have 'model'") + if self.tools is not None: + raise ValueError("workflow agents cannot have 'tools'") + if self.system_prompt: + raise ValueError("workflow agents cannot have 'system_prompt'") + if self.options: + raise ValueError("workflow agents cannot have 'options'") + if self.command: + raise ValueError("workflow agents cannot have 'command'") + if self.max_session_seconds: + raise ValueError("workflow agents cannot have 'max_session_seconds'") + if self.max_agent_iterations is not None: + raise ValueError("workflow agents cannot have 'max_agent_iterations'") return self diff --git a/src/conductor/config/validator.py b/src/conductor/config/validator.py index 8bd0c63..4581064 100644 --- a/src/conductor/config/validator.py +++ b/src/conductor/config/validator.py @@ -111,13 +111,18 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]: parallel_errors = _validate_parallel_groups(config) errors.extend(parallel_errors) - # Validate for_each groups: reject script steps as inline agents + # Validate for_each groups: reject script and workflow steps as inline agents for for_each_group in config.for_each: if for_each_group.agent.type == "script": errors.append( f"For-each group '{for_each_group.name}' uses a script step as its " "inline agent. Script steps cannot be used in for_each groups." ) + if for_each_group.agent.type == "workflow": + errors.append( + f"For-each group '{for_each_group.name}' uses a workflow step as its " + "inline agent. Workflow steps cannot be used in for_each groups." + ) # Validate workflow output references output_errors = _validate_output_references( @@ -379,6 +384,13 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]: "Script steps cannot be used in parallel groups." ) + # Validate no workflow steps in parallel groups + if agent.type == "workflow": + errors.append( + f"Agent '{agent_name}' in parallel group '{pg.name}' is a workflow step. " + "Workflow steps cannot be used in parallel groups." + ) + # PE-6.2: Validate parallel group route targets for_each_names = {fe.name for fe in config.for_each} all_names = agent_names | parallel_names | for_each_names diff --git a/src/conductor/engine/workflow.py b/src/conductor/engine/workflow.py index 2adb60d..5e857e9 100644 --- a/src/conductor/engine/workflow.py +++ b/src/conductor/engine/workflow.py @@ -45,6 +45,10 @@ logger = logging.getLogger(__name__) +# Maximum nesting depth for sub-workflow composition. +# Prevents runaway recursion when workflows reference each other. +MAX_SUBWORKFLOW_DEPTH = 10 + if TYPE_CHECKING: from conductor.config.schema import AgentDef, ForEachDef, ParallelGroup, WorkflowConfig @@ -265,6 +269,7 @@ def __init__( event_emitter: WorkflowEventEmitter | None = None, keyboard_listener: KeyboardListener | None = None, web_dashboard: WebDashboard | None = None, + _subworkflow_depth: int = 0, ) -> None: """Initialize the WorkflowEngine. @@ -291,6 +296,9 @@ def __init__( web_dashboard: Optional web dashboard for bidirectional gate input. When provided and connected, gate input is accepted from both CLI stdin and web UI, with first response winning. + _subworkflow_depth: Current nesting depth for sub-workflow composition. + Used internally to enforce MAX_SUBWORKFLOW_DEPTH. Callers should + not set this directly. Note: If both provider and registry are provided, registry takes precedence. @@ -343,6 +351,9 @@ def __init__( self._current_agent_name: str | None = None self._last_checkpoint_path: Path | None = None + # Sub-workflow depth tracking + self._subworkflow_depth = _subworkflow_depth + def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None: """Build pricing overrides from workflow cost configuration. @@ -468,6 +479,99 @@ async def _execute_script(self, agent: AgentDef, context: dict[str, Any]) -> Scr operation_name=f"script '{agent.name}'", ) + async def _execute_subworkflow( + self, + agent: AgentDef, + context: dict[str, Any], + ) -> dict[str, Any]: + """Execute a sub-workflow as a black-box step. + + Loads the referenced workflow YAML, creates a child WorkflowEngine, + and runs it with the parent agent's context as input. The sub-workflow's + final output is returned as the agent's output. + + Args: + agent: Workflow agent definition with ``workflow`` path. + context: Workflow context for template rendering (used as sub-workflow input). + + Returns: + The sub-workflow's final output dict. + + Raises: + ExecutionError: If the sub-workflow file cannot be loaded, + depth limit is exceeded, or execution fails. + """ + from conductor.config.loader import load_config + + if self._subworkflow_depth >= MAX_SUBWORKFLOW_DEPTH: + raise ExecutionError( + f"Sub-workflow depth limit exceeded ({MAX_SUBWORKFLOW_DEPTH}). " + f"Agent '{agent.name}' cannot invoke sub-workflow '{agent.workflow}'.", + suggestion=( + "Check for circular sub-workflow references or reduce nesting depth." + ), + ) + + assert agent.workflow is not None # noqa: S101 + + # Resolve sub-workflow path relative to parent workflow file + if self.workflow_path is not None: + base_dir = Path(self.workflow_path).resolve().parent + else: + base_dir = Path.cwd() + + sub_path = (base_dir / agent.workflow).resolve() + + if not sub_path.exists(): + raise ExecutionError( + f"Sub-workflow file not found: {sub_path} " + f"(referenced by agent '{agent.name}')", + suggestion="Check that the 'workflow' path is correct and the file exists.", + ) + + # Detect circular references via file path + current_path = ( + Path(self.workflow_path).resolve() if self.workflow_path else None + ) + if current_path is not None and sub_path == current_path: + raise ExecutionError( + f"Circular sub-workflow reference: agent '{agent.name}' " + f"references its own workflow file '{agent.workflow}'.", + suggestion="A workflow cannot reference itself as a sub-workflow.", + ) + + try: + sub_config = load_config(sub_path) + except Exception as exc: + raise ExecutionError( + f"Failed to load sub-workflow '{sub_path}' " + f"(referenced by agent '{agent.name}'): {exc}", + suggestion="Check the sub-workflow YAML for syntax or validation errors.", + ) from exc + + # Build sub-workflow inputs from the parent context + # Extract workflow.input.* values from the parent context + workflow_ctx = context.get("workflow", {}) + sub_inputs: dict[str, Any] = dict(workflow_ctx.get("input", {})) if isinstance( + workflow_ctx, dict + ) else {} + + # Create child engine inheriting provider/registry but with deeper depth + child_engine = WorkflowEngine( + config=sub_config, + provider=self._single_provider, + registry=self._registry, + skip_gates=self.skip_gates, + workflow_path=sub_path, + interrupt_event=self._interrupt_event, + event_emitter=self._event_emitter, + keyboard_listener=self._keyboard_listener, + web_dashboard=self._web_dashboard, + _subworkflow_depth=self._subworkflow_depth + 1, + ) + + return await child_engine.run(sub_inputs) + def _get_context_window_for_agent(self, agent: AgentDef) -> int | None: """Return the context window size for an agent's model.""" from conductor.engine.pricing import get_pricing @@ -1380,6 +1484,92 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]: ) continue + # Handle sub-workflow steps + if agent.type == "workflow": + agent_context = self.context.build_for_agent( + agent.name, + agent.input, + mode=self.config.workflow.context.mode, + ) + _sub_start = _time.time() + + sub_execution_count = ( + self.limits.get_agent_execution_count(agent.name) + 1 + ) + + self._emit( + "subworkflow_started", + { + "agent_name": agent.name, + "iteration": sub_execution_count, + "workflow": agent.workflow, + }, + ) + + try: + sub_output = await self._execute_subworkflow( + agent, agent_context + ) + except Exception as exc: + _sub_elapsed = _time.time() - _sub_start + self._emit( + "subworkflow_failed", + { + "agent_name": agent.name, + "elapsed": _sub_elapsed, + "error_type": type(exc).__name__, + "message": str(exc), + }, + ) + raise + _sub_elapsed = _time.time() - _sub_start + + self._emit( + "subworkflow_completed", + { + "agent_name": agent.name, + "elapsed": _sub_elapsed, + "output": sub_output, + }, + ) + + # Store sub-workflow output in context + self.context.store(agent.name, sub_output) + self.limits.record_execution(agent.name) + self.limits.check_timeout() + + route_result = self._evaluate_routes(agent, sub_output) + + self._emit( + "route_taken", + { + "from_agent": agent.name, + "to_agent": route_result.target, + }, + ) + + if route_result.target == "$end": + result = self._build_final_output(route_result.output_transform) + self._emit( + "workflow_completed", + { + "elapsed": _time.time() - _workflow_start, + "output": result, + }, + ) + self._execute_hook("on_complete", result=result) + return result + + current_agent_name = route_result.target + + # Check for interrupt after sub-workflow step + interrupt_result = await self._check_interrupt(current_agent_name) + if interrupt_result is not None: + current_agent_name = await self._handle_interrupt_result( + interrupt_result, current_agent_name + ) + continue + # Build context for this agent agent_context = self.context.build_for_agent( agent.name, diff --git a/tests/test_config/test_workflow_type_schema.py b/tests/test_config/test_workflow_type_schema.py new file mode 100644 index 0000000..9850023 --- /dev/null +++ b/tests/test_config/test_workflow_type_schema.py @@ -0,0 +1,279 @@ +"""Tests for workflow type schema validation. + +Tests cover: +- Valid workflow agent definitions +- Workflow field validation (workflow path required, forbidden fields) +- Workflow agents in parallel groups and for_each groups +- Backward compatibility with other agent types +""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from conductor.config.schema import ( + AgentDef, + ForEachDef, + GateOption, + LimitsConfig, + OutputField, + ParallelGroup, + RouteDef, + RuntimeConfig, + WorkflowConfig, + WorkflowDef, +) +from conductor.config.validator import validate_workflow_config +from conductor.exceptions import ConfigurationError + + +class TestWorkflowAgentDef: + """Tests for workflow type AgentDef validation.""" + + def test_valid_workflow_agent(self) -> None: + """Test creating a valid workflow agent.""" + agent = AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml") + assert agent.type == "workflow" + assert agent.workflow == "./sub.yaml" + + def test_valid_workflow_agent_with_routes(self) -> None: + """Test workflow agent with routes validates correctly.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + routes=[ + RouteDef(to="next_agent", when="{{ output.result == 'done' }}"), + RouteDef(to="$end"), + ], + ) + assert len(agent.routes) == 2 + + def test_valid_workflow_agent_with_input(self) -> None: + """Test workflow agent with input declarations.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + input=["workflow.input.topic"], + ) + assert agent.input == ["workflow.input.topic"] + + def test_valid_workflow_agent_with_output(self) -> None: + """Test workflow agent with output schema.""" + agent = AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + output={"findings": OutputField(type="string")}, + ) + assert "findings" in agent.output + + def test_workflow_without_path_raises(self) -> None: + """Test that workflow agent without workflow path raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents require 'workflow' path"): + AgentDef(name="bad", type="workflow") + + def test_workflow_with_empty_path_raises(self) -> None: + """Test that workflow agent with empty path raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents require 'workflow' path"): + AgentDef(name="bad", type="workflow", workflow="") + + def test_workflow_with_prompt_raises(self) -> None: + """Test that workflow agent with prompt raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'prompt'"): + AgentDef(name="bad", type="workflow", workflow="./s.yaml", prompt="hello") + + def test_workflow_with_provider_raises(self) -> None: + """Test that workflow agent with provider raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'provider'"): + AgentDef(name="bad", type="workflow", workflow="./s.yaml", provider="copilot") + + def test_workflow_with_model_raises(self) -> None: + """Test that workflow agent with model raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'model'"): + AgentDef(name="bad", type="workflow", workflow="./s.yaml", model="gpt-4") + + def test_workflow_with_tools_raises(self) -> None: + """Test that workflow agent with tools raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'tools'"): + AgentDef(name="bad", type="workflow", workflow="./s.yaml", tools=["web_search"]) + + def test_workflow_with_system_prompt_raises(self) -> None: + """Test that workflow agent with system_prompt raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'system_prompt'"): + AgentDef(name="bad", type="workflow", workflow="./s.yaml", system_prompt="You are...") + + def test_workflow_with_options_raises(self) -> None: + """Test that workflow agent with options raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'options'"): + AgentDef( + name="bad", + type="workflow", + workflow="./s.yaml", + options=[GateOption(label="OK", value="ok", route="$end")], + ) + + def test_workflow_with_command_raises(self) -> None: + """Test that workflow agent with command raises ValidationError.""" + with pytest.raises(ValidationError, match="workflow agents cannot have 'command'"): + AgentDef(name="bad", type="workflow", workflow="./s.yaml", command="echo") + + def test_workflow_with_max_session_seconds_raises(self) -> None: + """Test that workflow agent with max_session_seconds raises ValidationError.""" + with pytest.raises( + ValidationError, match="workflow agents cannot have 'max_session_seconds'" + ): + AgentDef( + name="bad", type="workflow", workflow="./s.yaml", max_session_seconds=60.0 + ) + + def test_workflow_with_max_agent_iterations_raises(self) -> None: + """Test that workflow agent with max_agent_iterations raises ValidationError.""" + with pytest.raises( + ValidationError, match="workflow agents cannot have 'max_agent_iterations'" + ): + AgentDef( + name="bad", type="workflow", workflow="./s.yaml", max_agent_iterations=100 + ) + + +class TestWorkflowBackwardCompatibility: + """Test that existing agent types still work after adding workflow type.""" + + def test_regular_agent_still_works(self) -> None: + """Test that a regular agent definition is unaffected.""" + agent = AgentDef(name="test", prompt="hello") + assert agent.type is None + assert agent.workflow is None + + def test_script_agent_still_works(self) -> None: + """Test that script agent is unaffected.""" + agent = AgentDef(name="test", type="script", command="echo") + assert agent.type == "script" + + def test_human_gate_still_works(self) -> None: + """Test that human_gate type is unaffected.""" + agent = AgentDef( + name="gate", + type="human_gate", + prompt="Choose:", + options=[GateOption(label="Yes", value="yes", route="$end")], + ) + assert agent.type == "human_gate" + + +class TestWorkflowInParallelGroup: + """Tests for workflow agents in parallel groups.""" + + def test_workflow_in_parallel_group_raises(self) -> None: + """Test that workflow agent in parallel group raises ConfigurationError.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="test", + entry_point="parallel_group", + runtime=RuntimeConfig(provider="copilot"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef(name="agent_a", prompt="do something"), + AgentDef(name="sub_wf", type="workflow", workflow="./sub.yaml"), + ], + parallel=[ + ParallelGroup( + name="parallel_group", + agents=["agent_a", "sub_wf"], + ), + ], + ) + with pytest.raises(ConfigurationError, match="workflow step"): + validate_workflow_config(config) + + +class TestWorkflowInForEach: + """Tests for workflow agents in for_each groups.""" + + def test_workflow_in_for_each_raises(self) -> None: + """Test that workflow step in for_each inline agent raises ConfigurationError.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="test", + entry_point="loop", + runtime=RuntimeConfig(provider="copilot"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef(name="setup", prompt="init"), + ], + for_each=[ + ForEachDef( + name="loop", + type="for_each", + source="setup.output.items", + **{"as": "item"}, + agent=AgentDef( + name="runner", + type="workflow", + workflow="./sub.yaml", + ), + ), + ], + ) + with pytest.raises(ConfigurationError, match="Workflow steps cannot be used in for_each"): + validate_workflow_config(config) + + +class TestWorkflowWorkflowConfig: + """Tests for WorkflowConfig with workflow agents.""" + + def test_workflow_at_entry_point_validates(self) -> None: + """Test that a workflow agent can be the workflow entry_point.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="test", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + # Should not raise + warnings = validate_workflow_config(config) + assert isinstance(warnings, list) + + def test_workflow_with_routes_to_agents(self) -> None: + """Test that workflow agent can route to other agents.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="test", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + routes=[ + RouteDef(to="processor"), + ], + ), + AgentDef( + name="processor", + prompt="Process the output", + routes=[RouteDef(to="$end")], + ), + ], + ) + # Should not raise + warnings = validate_workflow_config(config) + assert isinstance(warnings, list) diff --git a/tests/test_engine/test_subworkflow.py b/tests/test_engine/test_subworkflow.py new file mode 100644 index 0000000..e8ab030 --- /dev/null +++ b/tests/test_engine/test_subworkflow.py @@ -0,0 +1,529 @@ +"""Integration tests for sub-workflow (type: workflow) steps in WorkflowEngine. + +Tests cover: +- Linear workflow with sub-workflow step +- Sub-workflow output accessible in subsequent agent context +- Sub-workflow with routes +- Sub-workflow depth limit enforcement +- Self-referencing circular detection +- Sub-workflow file not found error +- Mixed agent + sub-workflow workflows +- Dry-run plan includes workflow steps +""" + +from __future__ import annotations + +import textwrap +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from conductor.config.schema import ( + AgentDef, + ContextConfig, + LimitsConfig, + RouteDef, + RuntimeConfig, + WorkflowConfig, + WorkflowDef, +) +from conductor.engine.workflow import MAX_SUBWORKFLOW_DEPTH, WorkflowEngine +from conductor.exceptions import ExecutionError +from conductor.providers.copilot import CopilotProvider + + +@pytest.fixture +def tmp_workflow_dir(tmp_path: Path) -> Path: + """Create a temporary directory with sub-workflow files.""" + return tmp_path + + +def _write_yaml(path: Path, content: str) -> Path: + """Write YAML content to a file and return the path.""" + path.write_text(textwrap.dedent(content), encoding="utf-8") + return path + + +class TestSubWorkflowLinear: + """Tests for linear workflows with sub-workflow steps.""" + + @pytest.mark.asyncio + async def test_subworkflow_runs_to_end(self, tmp_workflow_dir: Path) -> None: + """Test linear workflow with a sub-workflow step that completes.""" + # Create the sub-workflow file + sub_path = _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-workflow + entry_point: inner_agent + runtime: + provider: copilot + limits: + max_iterations: 5 + agents: + - name: inner_agent + prompt: "Do inner work on {{ workflow.input.topic }}" + routes: + - to: "$end" + output: + result: "{{ inner_agent.output.result }}" + """, + ) + + # Create the parent workflow config + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent-workflow", + entry_point="research", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="research", + type="workflow", + workflow="sub.yaml", + routes=[RouteDef(to="$end")], + ), + ], + output={ + "result": "{{ research.output.result }}", + }, + ) + + def mock_handler(agent, prompt, context): + return {"result": "inner work done"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({"topic": "Python"}) + + assert result["result"] == "inner work done" + + @pytest.mark.asyncio + async def test_subworkflow_output_in_context(self, tmp_workflow_dir: Path) -> None: + """Test sub-workflow output accessible in subsequent agent's context.""" + sub_path = _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: finder + runtime: + provider: copilot + limits: + max_iterations: 5 + agents: + - name: finder + prompt: "Find data" + routes: + - to: "$end" + output: + findings: "{{ finder.output.findings }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="research", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="research", + type="workflow", + workflow="sub.yaml", + routes=[RouteDef(to="synthesizer")], + ), + AgentDef( + name="synthesizer", + prompt="Synthesize: {{ research.output.findings }}", + routes=[RouteDef(to="$end")], + ), + ], + output={ + "synthesis": "{{ synthesizer.output.result }}", + }, + ) + + received_prompts = [] + + def mock_handler(agent, prompt, context): + received_prompts.append(prompt) + if agent.name == "finder": + return {"findings": "important data"} + return {"result": "synthesis complete"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + + # The synthesizer should have received the sub-workflow's output in its prompt + assert any("important data" in p for p in received_prompts) + assert result["synthesis"] == "synthesis complete" + + +class TestSubWorkflowDepthLimit: + """Tests for sub-workflow depth limit enforcement.""" + + @pytest.mark.asyncio + async def test_depth_limit_exceeded(self, tmp_workflow_dir: Path) -> None: + """Test that exceeding max sub-workflow depth raises ExecutionError.""" + sub_path = _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Inner" + routes: + - to: "$end" + output: + result: "{{ inner.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + provider = CopilotProvider(mock_handler=lambda agent, prompt, context: {"result": "ok"}) + engine = WorkflowEngine( + config, + provider, + workflow_path=parent_path, + _subworkflow_depth=MAX_SUBWORKFLOW_DEPTH, + ) + + with pytest.raises(ExecutionError, match="depth limit exceeded"): + await engine.run({}) + + +class TestSubWorkflowErrors: + """Tests for sub-workflow error conditions.""" + + @pytest.mark.asyncio + async def test_subworkflow_file_not_found(self, tmp_workflow_dir: Path) -> None: + """Test that missing sub-workflow file raises ExecutionError.""" + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="nonexistent.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + with pytest.raises(ExecutionError, match="Sub-workflow file not found"): + await engine.run({}) + + @pytest.mark.asyncio + async def test_self_referencing_workflow(self, tmp_workflow_dir: Path) -> None: + """Test that a workflow referencing itself raises ExecutionError.""" + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="parent.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider, workflow_path=parent_path) + + with pytest.raises(ExecutionError, match="Circular sub-workflow reference"): + await engine.run({}) + + +class TestSubWorkflowRouting: + """Tests for routing from sub-workflow steps.""" + + @pytest.mark.asyncio + async def test_subworkflow_route_to_agent(self, tmp_workflow_dir: Path) -> None: + """Test routing from sub-workflow to another agent.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: analyzer + runtime: + provider: copilot + limits: + max_iterations: 5 + agents: + - name: analyzer + prompt: "Analyze" + routes: + - to: "$end" + output: + analysis: "{{ analyzer.output.analysis }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + routes=[ + RouteDef( + to="summarizer", + when="{{ output.analysis == 'needs summary' }}", + ), + RouteDef(to="$end"), + ], + ), + AgentDef( + name="summarizer", + prompt="Summarize", + routes=[RouteDef(to="$end")], + ), + ], + output={ + "result": "{{ sub_wf.output.analysis }}", + }, + ) + + def mock_handler(agent, prompt, context): + if agent.name == "analyzer": + return {"analysis": "needs summary"} + return {"result": "summarized"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + + assert result["result"] == "needs summary" + + +class TestSubWorkflowMixed: + """Tests for mixed agent + sub-workflow workflows.""" + + @pytest.mark.asyncio + async def test_mixed_agent_and_subworkflow(self, tmp_workflow_dir: Path) -> None: + """Test workflow with both agent and sub-workflow steps.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Inner work" + routes: + - to: "$end" + output: + data: "{{ inner.output.data }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="mixed", + entry_point="setup", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="setup", + prompt="Setup the work", + routes=[RouteDef(to="sub_wf")], + ), + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + routes=[RouteDef(to="$end")], + ), + ], + output={ + "data": "{{ sub_wf.output.data }}", + }, + ) + + def mock_handler(agent, prompt, context): + if agent.name == "setup": + return {"result": "setup done"} + return {"data": "inner data"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + result = await engine.run({}) + + assert result["data"] == "inner data" + + +class TestSubWorkflowDryRun: + """Tests for dry-run plan generation with sub-workflow steps.""" + + def test_dry_run_includes_workflow_type(self) -> None: + """Test that dry-run plan includes sub-workflow steps with correct agent_type.""" + config = WorkflowConfig( + workflow=WorkflowDef( + name="sub-wf-dryrun", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="./sub.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + mock_provider = MagicMock() + engine = WorkflowEngine(config, mock_provider) + plan = engine.build_execution_plan() + + assert len(plan.steps) == 1 + assert plan.steps[0].agent_name == "sub_wf" + assert plan.steps[0].agent_type == "workflow" + + +class TestSubWorkflowIterationCounting: + """Tests for sub-workflow iteration limit counting.""" + + @pytest.mark.asyncio + async def test_subworkflow_counts_toward_iteration_limit( + self, tmp_workflow_dir: Path + ) -> None: + """Test that sub-workflow step counts as one iteration in parent.""" + _write_yaml( + tmp_workflow_dir / "sub.yaml", + """\ + workflow: + name: sub-wf + entry_point: inner + runtime: + provider: copilot + limits: + max_iterations: 5 + agents: + - name: inner + prompt: "Inner" + routes: + - to: "$end" + output: + result: "{{ inner.output.result }}" + """, + ) + + parent_path = tmp_workflow_dir / "parent.yaml" + parent_path.write_text("dummy", encoding="utf-8") + + config = WorkflowConfig( + workflow=WorkflowDef( + name="parent", + entry_point="sub_wf", + runtime=RuntimeConfig(provider="copilot"), + context=ContextConfig(mode="accumulate"), + limits=LimitsConfig(max_iterations=10), + ), + agents=[ + AgentDef( + name="sub_wf", + type="workflow", + workflow="sub.yaml", + routes=[RouteDef(to="$end")], + ), + ], + ) + + def mock_handler(agent, prompt, context): + return {"result": "done"} + + provider = CopilotProvider(mock_handler=mock_handler) + engine = WorkflowEngine(config, provider, workflow_path=parent_path) + await engine.run({}) + + assert engine.limits.current_iteration == 1