Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions examples/staged-review.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Staged Review Workflow
#
# This example demonstrates staged agent re-invocation, where the same
# agent appears at multiple stages in a workflow with different prompts.
# The VP plans the project, the IC implements it, and then the same VP
# reviews the implementation — without needing a duplicate agent definition.
#
# Key features demonstrated:
# - stages: dict on AgentDef for multi-invocation
# - Stage-qualified route targets (vp:review)
# - Stage-specific context access (stages.vp.default.output)
# - Conditional loop-back from review to IC
#
# Usage:
# conductor run examples/staged-review.yaml --input project="Build a REST API"

workflow:
name: staged-review
description: Demonstrates staged agent re-invocation (VP→IC→VP:review)
version: "1.0.0"
entry_point: vp

runtime:
provider: copilot
default_model: gpt-4.1

input:
project:
type: string
required: true
description: The project to plan, implement, and review

context:
mode: accumulate

limits:
max_iterations: 10
timeout_seconds: 300

agents:
# VP: appears twice — once to plan, once to review
- name: vp
description: VP of Engineering — plans and reviews
model: gpt-4.1
tools: []
system_prompt: |
You are the VP of Engineering. You set technical direction
and review your team's execution quality.

prompt: |
Create a technical plan for: {{ workflow.input.project }}

Define:
1. Architecture approach
2. Key components to build
3. Success criteria for the implementation
input:
- workflow.input.project
output:
plan:
type: string
description: The technical plan
components:
type: array
description: Key components to build
success_criteria:
type: array
description: Criteria for successful implementation
routes:
- to: ic

stages:
review:
description: Reviews IC implementation against the original plan
prompt: |
Review the IC's implementation against your original plan.

YOUR ORIGINAL PLAN:
{{ stages.vp.default.output.plan }}

SUCCESS CRITERIA:
{{ stages.vp.default.output.success_criteria | json }}

IC IMPLEMENTATION:
{{ ic.output | json }}

Evaluate:
1. Does the implementation match the plan?
2. Are success criteria met?
3. What improvements are needed?
input:
- vp:default.output
- ic.output
output:
approved:
type: boolean
description: Whether the implementation meets the plan
score:
type: number
description: Quality score from 1-10
feedback:
type: string
description: Detailed review feedback
routes:
- to: ic
when: "{{ not output.approved }}"
- to: "$end"

# IC: implements the VP's plan
- name: ic
description: Individual Contributor — implements the plan
model: gpt-4.1
tools: []
prompt: |
Implement the VP's technical plan.

PLAN: {{ vp.output.plan }}
COMPONENTS: {{ vp.output.components | json }}

{% if stages.vp.review is defined %}
VP FEEDBACK FROM PREVIOUS REVIEW:
{{ stages.vp.review.output.feedback }}
{% endif %}

Produce a detailed implementation for each component.
input:
- vp.output
- vp:review.output?
output:
implementation:
type: object
description: Implementation details for each component
status:
type: string
description: Overall implementation status
routes:
- to: vp:review

output:
project: "{{ workflow.input.project }}"
plan: "{{ stages.vp.default.output.plan }}"
implementation: "{{ ic.output.implementation | json }}"
approved: "{{ stages.vp.review.output.approved }}"
score: "{{ stages.vp.review.output.score }}"
feedback: "{{ stages.vp.review.output.feedback }}"
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added screenshots-for-pr/staged-node-ic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added screenshots-for-pr/staged-node-vp-default.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added screenshots-for-pr/staged-node-vp-review.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added screenshots-for-pr/staged-workflow-mobile.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions src/conductor/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
and environment variable resolution.
"""

from conductor.config.expander import expand_stages
from conductor.config.loader import (
ConfigLoader,
load_config,
Expand All @@ -20,6 +21,7 @@
OutputField,
RouteDef,
RuntimeConfig,
StageDef,
WorkflowConfig,
WorkflowDef,
)
Expand All @@ -30,6 +32,7 @@
"ConfigLoader",
"load_config",
"load_config_string",
"expand_stages",
"resolve_env_vars",
# Schema models
"AgentDef",
Expand All @@ -40,6 +43,7 @@
"LimitsConfig",
"OutputField",
"RouteDef",
"StageDef",
"RuntimeConfig",
"WorkflowConfig",
"WorkflowDef",
Expand Down
137 changes: 137 additions & 0 deletions src/conductor/config/expander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""Stage expansion for staged agents.

This module expands agents with ``stages`` definitions into synthetic
``AgentDef`` instances at config load time, so the workflow engine sees
only regular agents and requires minimal changes.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from conductor.config.schema import WorkflowConfig

logger = logging.getLogger(__name__)


def expand_stages(config: WorkflowConfig) -> WorkflowConfig:
"""Expand staged agents into synthetic AgentDef instances.

For each agent with a non-None ``stages`` dict:
1. Creates a ``agent:default`` synthetic agent from the base definition.
2. Creates ``agent:stage`` synthetic agents for each stage, applying
per-stage overrides (prompt, input, output, routes, description).
3. Rewrites bare route targets pointing to staged agents to use
``agent:default``.
4. Rewrites ``entry_point`` if it references a staged agent.

Called after Pydantic validation, before cross-reference validation.

Args:
config: The validated WorkflowConfig with potential staged agents.

Returns:
The same config, mutated in place, with synthetic agents added
and route targets rewritten.
"""
from conductor.config.schema import AgentDef

staged_agents = [a for a in config.agents if a.stages]
if not staged_agents:
return config

staged_agent_names = {a.name for a in staged_agents}
existing_names = {a.name for a in config.agents}
synthetic_agents: list[AgentDef] = []

for agent in staged_agents:
assert agent.stages is not None # guaranteed by filter above
# Validate no name collisions with existing agents
for stage_name in agent.stages:
synthetic_name = f"{agent.name}:{stage_name}"
if synthetic_name in existing_names:
from conductor.exceptions import ConfigurationError

raise ConfigurationError(
f"Name collision: agent '{synthetic_name}' already exists, "
f"conflicts with stage '{stage_name}' of agent '{agent.name}'"
)
default_name = f"{agent.name}:default"
if default_name in existing_names:
from conductor.exceptions import ConfigurationError

raise ConfigurationError(
f"Name collision: agent '{default_name}' already exists, "
f"conflicts with default stage of agent '{agent.name}'"
)

# Create the default synthetic agent from the base definition
default_agent = agent.model_copy(deep=True)
default_agent.name = default_name
default_agent.stages = None
synthetic_agents.append(default_agent)

# Create one synthetic agent per stage
for stage_name, stage_def in agent.stages.items():
stage_agent = agent.model_copy(deep=True)
stage_agent.name = f"{agent.name}:{stage_name}"
stage_agent.stages = None

# Override fields from StageDef
if stage_def.prompt is not None:
stage_agent.prompt = stage_def.prompt
if stage_def.input is not None:
stage_agent.input = stage_def.input
if stage_def.output is not None:
stage_agent.output = stage_def.output
if stage_def.routes is not None:
stage_agent.routes = stage_def.routes
if stage_def.description is not None:
stage_agent.description = stage_def.description

synthetic_agents.append(stage_agent)

# Add synthetic agents to config
config.agents.extend(synthetic_agents)

# Rewrite bare route targets for staged agents
_rewrite_routes(config, staged_agent_names)

# Rewrite entry_point
if config.workflow.entry_point in staged_agent_names:
config.workflow.entry_point = f"{config.workflow.entry_point}:default"

return config


def _rewrite_routes(config: WorkflowConfig, staged_agent_names: set[str]) -> None:
"""Rewrite bare route targets pointing to staged agents.

Any route ``to: "agent_name"`` where ``agent_name`` has stages is
rewritten to ``to: "agent_name:default"``.

Args:
config: The WorkflowConfig to modify.
staged_agent_names: Set of agent names that have stages.
"""
# Rewrite agent routes (including synthetic agents already added)
for agent in config.agents:
if agent.stages is not None:
continue # Skip original staged agents
for route in agent.routes:
if route.to in staged_agent_names:
route.to = f"{route.to}:default"

# Rewrite parallel group routes
for pg in config.parallel:
for route in pg.routes:
if route.to in staged_agent_names:
route.to = f"{route.to}:default"

# Rewrite for-each group routes
for fe in config.for_each:
for route in fe.routes:
if route.to in staged_agent_names:
route.to = f"{route.to}:default"
4 changes: 3 additions & 1 deletion src/conductor/config/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ruamel.yaml.constructor import RoundTripConstructor
from ruamel.yaml.error import YAMLError

from conductor.config.expander import expand_stages
from conductor.config.schema import WorkflowConfig
from conductor.exceptions import ConfigurationError

Expand Down Expand Up @@ -324,7 +325,8 @@ def _validate(self, data: dict[str, Any], source: str) -> WorkflowConfig:
ConfigurationError: If the data fails schema validation.
"""
try:
return WorkflowConfig.model_validate(data)
config = WorkflowConfig.model_validate(data)
return expand_stages(config)
except Exception as e:
# Format Pydantic validation errors nicely
error_msg = str(e)
Expand Down
Loading
Loading