Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .claude/skills/conductor/references/yaml-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ agents:
max_agent_iterations: integer # Max tool-use roundtrips for this agent (1-500, optional)
max_session_seconds: float # Wall-clock timeout for this agent session (optional)

# Per-agent retry policy (optional, not allowed for script agents)
retry:
max_attempts: integer # Max attempts including first (1-10, default: 1 = no retry)
backoff: string # "exponential" (default) or "fixed"
delay_seconds: float # Base delay in seconds (0-300, default: 2.0)
retry_on: # Error categories to retry (default: all)
- string # "provider_error" (API 500s, rate limits) or "timeout"

# Script-only fields (type: script)
command: string # Command to execute (Jinja2 templated)
args: [string] # Command arguments (each Jinja2 templated)
Expand All @@ -138,7 +146,7 @@ agents:
timeout: integer # Per-script timeout in seconds
```

**Script agent restrictions:** Cannot have `prompt`, `provider`, `model`, `tools`, `output`, `system_prompt`, `options`. Output is always `{stdout, stderr, exit_code}`.
**Script agent restrictions:** Cannot have `prompt`, `provider`, `model`, `tools`, `output`, `system_prompt`, `options`, `retry`. Output is always `{stdout, stderr, exit_code}`.

## Script Agent Schema

Expand Down
59 changes: 59 additions & 0 deletions src/conductor/config/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,45 @@ class HooksConfig(BaseModel):
"""Expression evaluated when workflow fails."""


class RetryPolicy(BaseModel):
"""Per-agent retry policy for transient failure resilience.

Controls how an agent retries on transient failures such as API errors,
rate limits, and timeouts. Retry counter resets per agent execution.

Example YAML::

retry:
max_attempts: 3
backoff: exponential
delay_seconds: 2
retry_on:
- provider_error
- timeout
"""

max_attempts: int = Field(default=1, ge=1, le=10)
"""Maximum number of attempts (including the first). 1 = no retry."""

backoff: Literal["fixed", "exponential"] = "exponential"
"""Backoff strategy between retries."""

delay_seconds: float = Field(default=2.0, ge=0.0, le=300.0)
"""Base delay in seconds before the first retry."""

retry_on: list[Literal["provider_error", "timeout"]] = Field(
default_factory=lambda: ["provider_error", "timeout"]
)
"""Error categories that trigger a retry.

- ``provider_error``: API 500s, rate limits, transient provider failures.
- ``timeout``: Agent-level timeout exceeded.

Validation errors (output schema mismatches) are never retried because
they indicate prompt/schema issues, not transience.
"""


class AgentDef(BaseModel):
"""Definition for a single agent in the workflow."""

Expand Down Expand Up @@ -445,6 +484,24 @@ class AgentDef(BaseModel):
max_agent_iterations: 200 instead of using the default limit.
"""

retry: RetryPolicy | None = None
"""Per-agent retry policy for transient failures.

When set, the provider wraps agent execution in a retry loop with
the specified backoff strategy. Only applies to provider-backed agents
(not script or human_gate).

Example YAML::

retry:
max_attempts: 3
backoff: exponential
delay_seconds: 2
retry_on:
- provider_error
- timeout
"""

@field_validator("timeout")
@classmethod
def validate_timeout(cls, v: int | None) -> int | None:
Expand Down Expand Up @@ -485,6 +542,8 @@ def validate_agent_type(self) -> AgentDef:
raise ValueError("script agents cannot have 'max_session_seconds'")
if self.max_agent_iterations is not None:
raise ValueError("script agents cannot have 'max_agent_iterations'")
if self.retry is not None:
raise ValueError("script agents cannot have 'retry'")
return self


Expand Down
97 changes: 89 additions & 8 deletions src/conductor/providers/claude.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class RetryConfig(BaseModel):
base_delay: Base delay in seconds before first retry.
max_delay: Maximum delay in seconds between retries.
jitter: Maximum random jitter to add to delay (0.0 to 1.0 fraction of delay).
backoff: Backoff strategy: "exponential" or "fixed".
retry_on: Error categories that trigger a retry ("provider_error", "timeout").
max_parse_recovery_attempts: Maximum number of in-session recovery attempts
for JSON parse failures. When parsing fails, a follow-up message is sent
to the same session asking the model to correct its response format.
Expand All @@ -85,6 +87,8 @@ class RetryConfig(BaseModel):
base_delay: float = 1.0
max_delay: float = 30.0
jitter: float = 0.25
backoff: str = "exponential"
retry_on: list[str] | None = None
max_parse_recovery_attempts: int = 2 # Claude: 2 attempts (less than Copilot's 5)


Expand Down Expand Up @@ -437,6 +441,59 @@ async def execute(
event_callback=event_callback,
)

def _resolve_retry_config(self, agent: AgentDef) -> RetryConfig:
"""Resolve the retry config for an agent.

If the agent has a per-agent retry policy, build a RetryConfig from it.
Otherwise, fall back to the provider-level default.

Args:
agent: Agent definition that may contain a retry policy.

Returns:
RetryConfig to use for this agent's execution.
"""
from conductor.config.schema import RetryPolicy

retry = getattr(agent, "retry", None)
if not isinstance(retry, RetryPolicy):
return self._retry_config

return RetryConfig(
max_attempts=retry.max_attempts,
base_delay=retry.delay_seconds,
max_delay=self._retry_config.max_delay,
jitter=self._retry_config.jitter,
backoff=retry.backoff,
retry_on=list(retry.retry_on),
max_parse_recovery_attempts=self._retry_config.max_parse_recovery_attempts,
)

@staticmethod
def _classify_error(error: Exception) -> str:
"""Classify an error into a retry category.

Maps exception types to the retry_on categories used in per-agent
retry policies.

Args:
error: The exception to classify.

Returns:
Error category string: "provider_error" or "timeout".
"""
from conductor.exceptions import ProviderError
from conductor.exceptions import TimeoutError as ConductorTimeoutError

if isinstance(error, (ConductorTimeoutError, asyncio.TimeoutError)):
return "timeout"
if isinstance(error, ProviderError):
if error.status_code == 408:
return "timeout"
if "timeout" in str(error).lower():
return "timeout"
return "provider_error"

def _is_retryable_error(self, exception: Exception) -> bool:
"""Determine if an error should trigger a retry.

Expand Down Expand Up @@ -531,7 +588,9 @@ def _get_retry_after(self, exception: Exception) -> float | None:
return None

def _calculate_delay(self, attempt: int, config: RetryConfig) -> float:
"""Calculate delay with exponential backoff and jitter.
"""Calculate delay with backoff and jitter.

Supports both exponential and fixed backoff strategies.

Args:
attempt: Current attempt number (1-indexed).
Expand All @@ -540,8 +599,11 @@ def _calculate_delay(self, attempt: int, config: RetryConfig) -> float:
Returns:
Delay in seconds before next retry.
"""
# Exponential backoff: base * 2^(attempt-1)
delay = config.base_delay * (2 ** (attempt - 1))
if config.backoff == "fixed":
delay = config.base_delay
else:
# Exponential backoff: base * 2^(attempt-1)
delay = config.base_delay * (2 ** (attempt - 1))

# Cap at max delay
delay = min(delay, config.max_delay)
Expand Down Expand Up @@ -592,7 +654,7 @@ async def _execute_with_retry(
await self._ensure_mcp_connected()

last_error: Exception | None = None
config = self._retry_config
config = self._resolve_retry_config(agent)

# Build messages
messages = self._build_messages(rendered_prompt)
Expand Down Expand Up @@ -788,6 +850,12 @@ async def _execute_with_retry(
is_retryable=False,
) from e

# Check retry_on filter if per-agent retry is configured
if config.retry_on is not None:
error_category = self._classify_error(e)
if error_category not in config.retry_on:
raise

# Don't retry if this was the last attempt
if attempt >= config.max_attempts:
break
Expand All @@ -800,11 +868,9 @@ async def _execute_with_retry(
f"Rate limit hit (HTTP 429), respecting retry-after header: {delay}s"
)
else:
# Calculate delay with exponential backoff
# Calculate delay with backoff
delay = self._calculate_delay(attempt, config)
logger.info(
f"Calculated exponential backoff delay: {delay:.2f}s for attempt {attempt}"
)
logger.info(f"Calculated backoff delay: {delay:.2f}s for attempt {attempt}")

# Log retry attempt with full context
logger.warning(
Expand All @@ -813,6 +879,21 @@ async def _execute_with_retry(
)
retry_entry["delay"] = delay

# Emit agent_retry event
if event_callback is not None:
with contextlib.suppress(Exception):
event_callback(
"agent_retry",
{
"agent_name": agent.name,
"attempt": attempt,
"max_attempts": config.max_attempts,
"error": str(e),
"error_type": type(e).__name__,
"delay": delay,
},
)

await asyncio.sleep(delay)

# All retries exhausted
Expand Down
Loading
Loading