Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/cli/src/commands/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,11 @@ async fn apply_agent(
auto_clear_threshold: tmpl.auto_clear_threshold,
network_policy: parsed_network_policy,
docker_image: tmpl.docker_image.clone(),
extra_mounts: if tmpl.extra_mounts.is_empty() { None } else { Some(tmpl.extra_mounts.clone()) },
extra_mounts: if tmpl.extra_mounts.is_empty() {
None
} else {
Some(tmpl.extra_mounts.clone())
},
resource_limits: tmpl.resource_limits.clone(),
};

Expand Down
19 changes: 3 additions & 16 deletions crates/cli/src/commands/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,10 +965,7 @@ async fn create_agent(

// Build resource limits from individual flags.
let resource_limits = if cpu_limit.is_some() || memory_limit.is_some() {
Some(orchestrator::types::ResourceLimits {
cpu_limit,
memory_limit_mb: memory_limit,
})
Some(orchestrator::types::ResourceLimits { cpu_limit, memory_limit_mb: memory_limit })
} else {
None
};
Expand Down Expand Up @@ -2179,13 +2176,7 @@ fn display_agent(agent: &AgentResponse) {
if let Some(ref mounts) = agent.config.extra_mounts {
for mount in mounts {
let ro = if mount.read_only { ":ro" } else { "" };
println!(
"{}: {}:{}{}",
"Mount".bold(),
mount.host_path,
mount.container_path,
ro
);
println!("{}: {}:{}{}", "Mount".bold(), mount.host_path, mount.container_path, ro);
}
}
if let Some(ref policy) = agent.config.network_policy {
Expand Down Expand Up @@ -3077,11 +3068,7 @@ mod tests {

#[test]
fn test_parse_mount_flags_multiple() {
let mounts = vec![
"/a:/b".to_string(),
"/c:/d:ro".to_string(),
"/e:/f:rw".to_string(),
];
let mounts = vec!["/a:/b".to_string(), "/c:/d:ro".to_string(), "/e:/f:rw".to_string()];
let result = parse_mount_flags(&mounts).unwrap();
assert_eq!(result.len(), 3);
assert!(!result[0].read_only);
Expand Down
11 changes: 7 additions & 4 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,7 @@ async fn main() -> anyhow::Result<()> {
Arc::new(docker_backend)
}
other => {
anyhow::bail!(
"Unknown AGENTD_BACKEND value '{}'. Valid options: tmux, docker",
other
);
anyhow::bail!("Unknown AGENTD_BACKEND value '{}'. Valid options: tmux, docker", other);
}
};

Expand Down Expand Up @@ -264,6 +261,12 @@ async fn main() -> anyhow::Result<()> {

server.await??;

// Graceful shutdown: stop all managed agent sessions.
// AGENTD_SHUTDOWN_LEAVE_RUNNING=true leaves sessions alive for reconnection.
let leave_running =
env::var("AGENTD_SHUTDOWN_LEAVE_RUNNING").map(|v| v == "true" || v == "1").unwrap_or(false);
manager.shutdown_all(leave_running).await;

// Graceful shutdown: stop all workflow runners.
scheduler.shutdown_all().await;

Expand Down
173 changes: 151 additions & 22 deletions crates/orchestrator/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,60 +157,150 @@ impl AgentManager {
Ok(agent)
}

/// Reconcile DB state with actual tmux sessions and WebSocket connections on startup.
/// Reconcile DB state with actual backend sessions and WebSocket connections on startup.
///
/// Handles three cases for agents marked as `Running`:
/// Handles agents marked as `Running` against the actual backend state:
///
/// 1. **tmux session is gone** — the process died unexpectedly. Mark the
/// agent as `Failed`.
/// 1. **Session is gone** — the process/container died unexpectedly.
/// Check exit info to determine status: exit code 0 → `Stopped`,
/// non-zero or unknown → `Failed`.
///
/// 2. **tmux session is alive but agent is not connected to the registry** —
/// 2. **Session is alive but agent is not connected to the registry** —
/// the orchestrator was restarted and the in-memory `ConnectionRegistry`
/// was reset. The Claude process is still running in tmux but holds a
/// stale WebSocket connection to the old orchestrator instance. Kill the
/// session and re-launch Claude so it establishes a fresh connection.
/// was reset. The Claude process is still running but holds a stale
/// WebSocket connection. Kill the session and re-launch so it
/// establishes a fresh connection.
///
/// 3. **tmux session is alive and agent is connected** — everything is fine,
/// 3. **Session is alive and agent is connected** — everything is fine,
/// nothing to do.
///
/// After handling known agents, cleans up any orphaned backend sessions
/// (containers/tmux sessions with the correct prefix but no matching
/// DB record).
pub async fn reconcile(&self) -> anyhow::Result<()> {
let agents = self.storage.list(Some(AgentStatus::Running)).await?;
let mut known_sessions: std::collections::HashSet<String> =
std::collections::HashSet::new();

for agent in &agents {
if let Some(ref s) = agent.session_id {
known_sessions.insert(s.clone());
}
}

for agent in agents {
let session_alive = match agent.session_id.as_ref() {
Some(s) => self.backend.session_exists(s).await.unwrap_or(false),
None => false,
let session_name = match agent.session_id.clone() {
Some(s) => s,
None => {
// No session ID — mark as Failed.
let mut agent = agent;
warn!(agent_id = %agent.id, "Agent marked running but has no session ID, marking failed");
agent.status = AgentStatus::Failed;
agent.updated_at = Utc::now();
let _ = self.storage.update(&agent).await;
continue;
}
};

let session_alive = self.backend.session_exists(&session_name).await.unwrap_or(false);

if !session_alive {
// Case 1: tmux session is gone — mark as Failed.
// Case 1: session is gone — check exit info for diagnostics.
let mut agent = agent;
warn!(
agent_id = %agent.id,
"Agent marked running but tmux session is gone, marking failed"
);
agent.status = AgentStatus::Failed;
let exit_info = self.backend.session_exit_info(&session_name).await.ok().flatten();

let new_status = match &exit_info {
Some(info) if info.exit_code == 0 => {
info!(
agent_id = %agent.id,
session = %session_name,
"Agent session exited cleanly (exit code 0), marking stopped"
);
AgentStatus::Stopped
}
Some(info) => {
warn!(
agent_id = %agent.id,
session = %session_name,
exit_code = info.exit_code,
error = ?info.error,
"Agent session exited with error, marking failed"
);
AgentStatus::Failed
}
None => {
warn!(
agent_id = %agent.id,
session = %session_name,
"Agent marked running but session is gone, marking failed"
);
AgentStatus::Failed
}
};

agent.status = new_status;
agent.updated_at = Utc::now();
if let Err(e) = self.storage.update(&agent).await {
error!(agent_id = %agent.id, %e, "Failed to update agent status");
}
} else if !self.registry.is_connected(&agent.id).await {
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The health value is fetched and logged here but does not influence the restart decision — the agent is always restarted when disconnected from the registry regardless of health status. This is the right behaviour (stale WS must be replaced), but a brief comment explaining why health is checked (observability/logging only, not gating the restart) would help future readers.

// Case 2: tmux session alive but WebSocket connection is stale
// (orchestrator was restarted and ConnectionRegistry was reset).
// Kill the old Claude process and relaunch so it reconnects.
// Case 2: session alive but WebSocket connection is stale.
// Fetch health for observability/logging only — the restart is
// unconditional because the stale WebSocket must be replaced
// regardless of container health status.
let health = self
.backend
.session_health(&session_name)
.await
.unwrap_or(wrap::backend::SessionHealth::Unknown);

warn!(
agent_id = %agent.id,
"Agent has live tmux session but is not connected to registry after startup, restarting"
session = %session_name,
health = %health,
"Agent has live session but is not connected to registry, restarting"
);

if let Err(e) = self.restart_agent(&agent).await {
error!(agent_id = %agent.id, %e, "Failed to restart stale agent during reconcile");
}
}
// Case 3: alive and connected — nothing to do.
}

// Clean up orphaned backend sessions (sessions with our prefix but
// no matching DB record).
self.cleanup_orphaned_sessions(&known_sessions).await;

Ok(())
}

/// Remove backend sessions that are labeled with this backend's prefix
/// but have no corresponding agent record in the database.
async fn cleanup_orphaned_sessions(&self, known_sessions: &std::collections::HashSet<String>) {
let backend_sessions = match self.backend.list_sessions().await {
Ok(s) => s,
Err(e) => {
warn!(%e, "Failed to list backend sessions for orphan cleanup");
return;
}
};

for session in backend_sessions {
if !known_sessions.contains(&session) {
warn!(
session = %session,
"Found orphaned backend session with no DB record, removing"
);
if let Err(e) = self.backend.kill_session(&session).await {
error!(session = %session, %e, "Failed to clean up orphaned session");
} else {
info!(session = %session, "Orphaned session cleaned up");
}
}
}
}

/// Get an agent by ID (delegates to storage).
pub async fn get_agent(&self, id: &Uuid) -> anyhow::Result<Option<Agent>> {
self.storage.get(id).await
Expand Down Expand Up @@ -324,6 +414,45 @@ impl AgentManager {
self.storage.get_usage_stats(id).await
}

/// Graceful shutdown: stop all managed agent sessions.
///
/// Iterates over all running agents, marks them as `Stopped` in the
/// database, then delegates to the backend's `shutdown_all_sessions`
/// to clean up the actual processes/containers.
///
/// The `leave_running` flag controls whether backend sessions are
/// actually stopped or left running for reconnection on restart:
/// - `false` (default): stop all sessions
/// - `true`: only update DB status, leave sessions running
pub async fn shutdown_all(&self, leave_running: bool) {
info!(leave_running, "Shutting down all managed agents");

// Update all running agents to Stopped in the database.
let agents = match self.storage.list(Some(AgentStatus::Running)).await {
Ok(a) => a,
Err(e) => {
error!(%e, "Failed to list running agents during shutdown");
return;
}
};

for mut agent in agents {
agent.status = AgentStatus::Stopped;
agent.updated_at = Utc::now();
if let Err(e) = self.storage.update(&agent).await {
error!(agent_id = %agent.id, %e, "Failed to update agent status during shutdown");
}
}

if !leave_running {
if let Err(e) = self.backend.shutdown_all_sessions().await {
error!(%e, "Failed to shut down backend sessions");
}
} else {
info!("Leaving backend sessions running for reconnection on restart");
}
}

/// Restart a running agent: kill the current session and re-launch Claude.
///
/// Preserves the agent's ID, name, and config. The prompt is NOT re-sent
Expand Down
21 changes: 11 additions & 10 deletions crates/orchestrator/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,16 @@ impl AgentStorage {
auto_clear_threshold: Set(agent.config.auto_clear_threshold.map(|v| v as i64)),
network_policy: Set(agent.config.network_policy.as_ref().map(|p| p.to_string())),
docker_image: Set(agent.config.docker_image.clone()),
extra_mounts: Set(agent.config.extra_mounts.as_ref().map(|m| {
serde_json::to_string(m).unwrap_or_else(|_| "[]".to_string())
})),
resource_limits: Set(agent.config.resource_limits.as_ref().map(|r| {
serde_json::to_string(r).unwrap_or_else(|_| "{}".to_string())
})),
extra_mounts: Set(agent
.config
.extra_mounts
.as_ref()
.map(|m| serde_json::to_string(m).unwrap_or_else(|_| "[]".to_string()))),
resource_limits: Set(agent
.config
.resource_limits
.as_ref()
.map(|r| serde_json::to_string(r).unwrap_or_else(|_| "{}".to_string()))),
};

agent_entity::Entity::insert(model).exec(&self.db).await?;
Expand Down Expand Up @@ -510,10 +514,7 @@ fn model_to_agent(model: agent_entity::Model) -> Result<Agent> {
.transpose()
.unwrap_or(None),
docker_image: model.docker_image,
extra_mounts: model
.extra_mounts
.as_deref()
.and_then(|s| serde_json::from_str(s).ok()),
extra_mounts: model.extra_mounts.as_deref().and_then(|s| serde_json::from_str(s).ok()),
resource_limits: model
.resource_limits
.as_deref()
Expand Down
Loading
Loading