Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/skills/conductor/references/authoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ workflow:
```yaml
agents:
- name: my_agent # Required: unique identifier
type: agent # agent (default), human_gate, or script
type: agent # agent (default), human_gate, script, or workflow
description: What it does
model: gpt-5.2 # Override workflow default
provider: claude # Optional: per-agent provider override
Expand Down
2 changes: 1 addition & 1 deletion .claude/skills/conductor/references/yaml-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ agents:
name: string # Unique agent identifier

# Optional fields
type: string # "agent" (default), "human_gate", or "script"
type: string # "agent" (default), "human_gate", "script", or "workflow"
description: string # What this agent does
model: string # Override default_model
provider: string # Per-agent provider override ("copilot" or "claude")
Expand Down
38 changes: 37 additions & 1 deletion docs/workflow-syntax.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Agents are defined in the `agents` list. Each agent represents a unit of work.
agents:
- name: string # Required: Unique agent identifier
description: string # Optional: Purpose description
type: agent # agent | human_gate | script (default: agent)
type: agent # agent | human_gate | script | workflow (default: agent)
model: string # Optional: Model identifier (e.g., 'claude-sonnet-4.5')

prompt: | # Required for type=agent: Agent instructions
Expand Down Expand Up @@ -160,6 +160,42 @@ routes:

**Environment variable note** — values in `env` are passed as-is to the subprocess (they are not rendered as Jinja2 templates). Use `${VAR}` syntax in the workflow YAML loader if you need environment variable substitution in env values.

### Sub-Workflow Steps

Sub-workflow steps reference external workflow YAML files, enabling composable and reusable workflow building blocks. The sub-workflow runs as a black box — its internal agents are not visible to the parent.

```yaml
agents:
- name: deep_research
type: workflow
workflow: ./research-pipeline.yaml # Required: path to sub-workflow YAML
input: # Optional: explicit input declarations
- workflow.input.topic
output: # Optional: output schema for validation
findings:
type: string
routes:
- to: synthesizer
```

**Key semantics:**

- The `workflow` path is resolved relative to the parent workflow file
- Sub-workflow inherits the parent's provider configuration
- Sub-workflow output is stored in context and accessible via `{{ agent_name.output.field }}`
- Recursive composition is supported (sub-workflows can reference other sub-workflows) with a depth limit of 10
- Circular references (a workflow referencing itself) are detected and rejected

**Access sub-workflow output in downstream agents:**

```yaml
prompt: |
The research findings were:
{{ deep_research.output.findings }}
```

**Restrictions** — workflow steps cannot have `prompt`, `model`, `provider`, `tools`, `system_prompt`, `command`, or `options`. Workflow steps also cannot be used inside `parallel` groups or `for_each` groups.

## Parallel Groups

Parallel groups execute multiple agents concurrently for improved performance.
Expand Down
34 changes: 33 additions & 1 deletion src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class AgentDef(BaseModel):
description: str | None = None
"""Human-readable description of agent's purpose."""

type: Literal["agent", "human_gate", "script"] | None = None
type: Literal["agent", "human_gate", "script", "workflow"] | None = None
"""Agent type. Defaults to 'agent' if not specified."""

provider: Literal["copilot", "claude"] | None = None
Expand Down Expand Up @@ -425,6 +425,17 @@ class AgentDef(BaseModel):
timeout: int | None = None
"""Per-script timeout in seconds."""

workflow: str | None = None
"""Path to sub-workflow YAML file (required for type='workflow').

The path is resolved relative to the parent workflow file.
Sub-workflows run as black boxes — their internal agents are not
visible to the parent workflow.

Example:
workflow: ./research-pipeline.yaml
"""

max_session_seconds: float | None = Field(None, ge=1.0)
"""Maximum wall-clock duration for this agent's session in seconds.

Expand Down Expand Up @@ -485,6 +496,27 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("script agents cannot have 'max_session_seconds'")
if self.max_agent_iterations is not None:
raise ValueError("script agents cannot have 'max_agent_iterations'")
elif self.type == "workflow":
if not self.workflow:
raise ValueError("workflow agents require 'workflow' path")
if self.prompt:
raise ValueError("workflow agents cannot have 'prompt'")
if self.provider:
raise ValueError("workflow agents cannot have 'provider'")
if self.model:
raise ValueError("workflow agents cannot have 'model'")
if self.tools is not None:
raise ValueError("workflow agents cannot have 'tools'")
if self.system_prompt:
raise ValueError("workflow agents cannot have 'system_prompt'")
if self.options:
raise ValueError("workflow agents cannot have 'options'")
if self.command:
raise ValueError("workflow agents cannot have 'command'")
if self.max_session_seconds:
raise ValueError("workflow agents cannot have 'max_session_seconds'")
if self.max_agent_iterations is not None:
raise ValueError("workflow agents cannot have 'max_agent_iterations'")
return self


Expand Down
14 changes: 13 additions & 1 deletion src/conductor/config/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,18 @@ def validate_workflow_config(config: WorkflowConfig) -> list[str]:
parallel_errors = _validate_parallel_groups(config)
errors.extend(parallel_errors)

# Validate for_each groups: reject script steps as inline agents
# Validate for_each groups: reject script and workflow steps as inline agents
for for_each_group in config.for_each:
if for_each_group.agent.type == "script":
errors.append(
f"For-each group '{for_each_group.name}' uses a script step as its "
"inline agent. Script steps cannot be used in for_each groups."
)
if for_each_group.agent.type == "workflow":
errors.append(
f"For-each group '{for_each_group.name}' uses a workflow step as its "
"inline agent. Workflow steps cannot be used in for_each groups."
)

# Validate workflow output references
output_errors = _validate_output_references(
Expand Down Expand Up @@ -379,6 +384,13 @@ def _validate_parallel_groups(config: WorkflowConfig) -> list[str]:
"Script steps cannot be used in parallel groups."
)

# Validate no workflow steps in parallel groups
if agent.type == "workflow":
errors.append(
f"Agent '{agent_name}' in parallel group '{pg.name}' is a workflow step. "
"Workflow steps cannot be used in parallel groups."
)

# PE-6.2: Validate parallel group route targets
for_each_names = {fe.name for fe in config.for_each}
all_names = agent_names | parallel_names | for_each_names
Expand Down
190 changes: 190 additions & 0 deletions src/conductor/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@

logger = logging.getLogger(__name__)

# Maximum nesting depth for sub-workflow composition.
# Prevents runaway recursion when workflows reference each other.
MAX_SUBWORKFLOW_DEPTH = 10


if TYPE_CHECKING:
from conductor.config.schema import AgentDef, ForEachDef, ParallelGroup, WorkflowConfig
Expand Down Expand Up @@ -265,6 +269,7 @@ def __init__(
event_emitter: WorkflowEventEmitter | None = None,
keyboard_listener: KeyboardListener | None = None,
web_dashboard: WebDashboard | None = None,
_subworkflow_depth: int = 0,
) -> None:
"""Initialize the WorkflowEngine.

Expand All @@ -291,6 +296,9 @@ def __init__(
web_dashboard: Optional web dashboard for bidirectional gate input.
When provided and connected, gate input is accepted from
both CLI stdin and web UI, with first response winning.
_subworkflow_depth: Current nesting depth for sub-workflow composition.
Used internally to enforce MAX_SUBWORKFLOW_DEPTH. Callers should
not set this directly.

Note:
If both provider and registry are provided, registry takes precedence.
Expand Down Expand Up @@ -343,6 +351,9 @@ def __init__(
self._current_agent_name: str | None = None
self._last_checkpoint_path: Path | None = None

# Sub-workflow depth tracking
self._subworkflow_depth = _subworkflow_depth

def _build_pricing_overrides(self) -> dict[str, ModelPricing] | None:
"""Build pricing overrides from workflow cost configuration.

Expand Down Expand Up @@ -468,6 +479,99 @@ async def _execute_script(self, agent: AgentDef, context: dict[str, Any]) -> Scr
operation_name=f"script '{agent.name}'",
)

async def _execute_subworkflow(
self,
agent: AgentDef,
context: dict[str, Any],
) -> dict[str, Any]:
"""Execute a sub-workflow as a black-box step.

Loads the referenced workflow YAML, creates a child WorkflowEngine,
and runs it with the parent agent's context as input. The sub-workflow's
final output is returned as the agent's output.

Args:
agent: Workflow agent definition with ``workflow`` path.
context: Workflow context for template rendering (used as sub-workflow input).

Returns:
The sub-workflow's final output dict.

Raises:
ExecutionError: If the sub-workflow file cannot be loaded,
depth limit is exceeded, or execution fails.
"""
from conductor.config.loader import load_config

if self._subworkflow_depth >= MAX_SUBWORKFLOW_DEPTH:
raise ExecutionError(
f"Sub-workflow depth limit exceeded ({MAX_SUBWORKFLOW_DEPTH}). "
f"Agent '{agent.name}' cannot invoke sub-workflow '{agent.workflow}'.",
suggestion=(
"Check for circular sub-workflow references or reduce nesting depth."
),
)

assert agent.workflow is not None # noqa: S101

# Resolve sub-workflow path relative to parent workflow file
if self.workflow_path is not None:
base_dir = Path(self.workflow_path).resolve().parent
else:
base_dir = Path.cwd()

sub_path = (base_dir / agent.workflow).resolve()

if not sub_path.exists():
raise ExecutionError(
f"Sub-workflow file not found: {sub_path} "
f"(referenced by agent '{agent.name}')",
suggestion="Check that the 'workflow' path is correct and the file exists.",
)

# Detect circular references via file path
current_path = (
Path(self.workflow_path).resolve() if self.workflow_path else None
)
if current_path is not None and sub_path == current_path:
raise ExecutionError(
f"Circular sub-workflow reference: agent '{agent.name}' "
f"references its own workflow file '{agent.workflow}'.",
suggestion="A workflow cannot reference itself as a sub-workflow.",
)

try:
sub_config = load_config(sub_path)
except Exception as exc:
raise ExecutionError(
f"Failed to load sub-workflow '{sub_path}' "
f"(referenced by agent '{agent.name}'): {exc}",
suggestion="Check the sub-workflow YAML for syntax or validation errors.",
) from exc

# Build sub-workflow inputs from the parent context
# Extract workflow.input.* values from the parent context
workflow_ctx = context.get("workflow", {})
sub_inputs: dict[str, Any] = dict(workflow_ctx.get("input", {})) if isinstance(
workflow_ctx, dict
) else {}

# Create child engine inheriting provider/registry but with deeper depth
child_engine = WorkflowEngine(
config=sub_config,
provider=self._single_provider,
registry=self._registry,
skip_gates=self.skip_gates,
workflow_path=sub_path,
interrupt_event=self._interrupt_event,
event_emitter=self._event_emitter,
keyboard_listener=self._keyboard_listener,
web_dashboard=self._web_dashboard,
_subworkflow_depth=self._subworkflow_depth + 1,
)

return await child_engine.run(sub_inputs)

def _get_context_window_for_agent(self, agent: AgentDef) -> int | None:
"""Return the context window size for an agent's model."""
from conductor.engine.pricing import get_pricing
Expand Down Expand Up @@ -1380,6 +1484,92 @@ async def _execute_loop(self, current_agent_name: str) -> dict[str, Any]:
)
continue

# Handle sub-workflow steps
if agent.type == "workflow":
agent_context = self.context.build_for_agent(
agent.name,
agent.input,
mode=self.config.workflow.context.mode,
)
_sub_start = _time.time()

sub_execution_count = (
self.limits.get_agent_execution_count(agent.name) + 1
)

self._emit(
"subworkflow_started",
{
"agent_name": agent.name,
"iteration": sub_execution_count,
"workflow": agent.workflow,
},
)

try:
sub_output = await self._execute_subworkflow(
agent, agent_context
)
except Exception as exc:
_sub_elapsed = _time.time() - _sub_start
self._emit(
"subworkflow_failed",
{
"agent_name": agent.name,
"elapsed": _sub_elapsed,
"error_type": type(exc).__name__,
"message": str(exc),
},
)
raise
_sub_elapsed = _time.time() - _sub_start

self._emit(
"subworkflow_completed",
{
"agent_name": agent.name,
"elapsed": _sub_elapsed,
"output": sub_output,
},
)

# Store sub-workflow output in context
self.context.store(agent.name, sub_output)
self.limits.record_execution(agent.name)
self.limits.check_timeout()

route_result = self._evaluate_routes(agent, sub_output)

self._emit(
"route_taken",
{
"from_agent": agent.name,
"to_agent": route_result.target,
},
)

if route_result.target == "$end":
result = self._build_final_output(route_result.output_transform)
self._emit(
"workflow_completed",
{
"elapsed": _time.time() - _workflow_start,
"output": result,
},
)
self._execute_hook("on_complete", result=result)
return result

current_agent_name = route_result.target

# Check for interrupt after sub-workflow step
interrupt_result = await self._check_interrupt(current_agent_name)
if interrupt_result is not None:
current_agent_name = await self._handle_interrupt_result(
interrupt_result, current_agent_name
)
continue

# Build context for this agent
agent_context = self.context.build_for_agent(
agent.name,
Expand Down
Loading