From 5f0456cb4dd27c4255635a9fa00afc3e353af180 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Wed, 11 Mar 2026 14:17:56 -0700 Subject: [PATCH 1/4] feat: add reconciliation and health check support for Docker backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add SessionHealth enum (Healthy, Unhealthy, Starting, Unknown) and SessionExitInfo struct to the ExecutionBackend trait for container liveness detection and failure diagnostics - Implement session_health(), session_exit_info(), and shutdown_all_sessions() on DockerBackend using bollard container inspection (health check status, exit codes, OOM detection) - Update AgentManager::reconcile() to use exit info for smarter status transitions: exit code 0 → Stopped, non-zero → Failed - Add orphaned session cleanup during reconciliation (sessions with backend prefix but no matching DB record are removed) - Add AgentManager::shutdown_all() for graceful shutdown with configurable leave-running behavior (AGENTD_SHUTDOWN_LEAVE_RUNNING) - Add HEALTHCHECK directive to Dockerfile (claude --version every 30s) - Log container lifecycle events (start, stop, remove) for observability Closes #289 Co-Authored-By: Claude Opus 4.6 --- crates/orchestrator/src/main.rs | 7 ++ crates/orchestrator/src/manager.rs | 171 +++++++++++++++++++++++---- crates/wrap/src/backend.rs | 155 ++++++++++++++++++++++++ crates/wrap/src/docker.rs | 181 ++++++++++++++++++++++++++++- crates/wrap/src/lib.rs | 2 +- docker/claude-code/Dockerfile | 6 + 6 files changed, 498 insertions(+), 24 deletions(-) diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index de231a1a..f8338f6c 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -264,6 +264,13 @@ async fn main() -> anyhow::Result<()> { server.await??; + // Graceful shutdown: stop all managed agent sessions. + // AGENTD_SHUTDOWN_LEAVE_RUNNING=true leaves sessions alive for reconnection. + let leave_running = env::var("AGENTD_SHUTDOWN_LEAVE_RUNNING") + .map(|v| v == "true" || v == "1") + .unwrap_or(false); + manager.shutdown_all(leave_running).await; + // Graceful shutdown: stop all workflow runners. scheduler.shutdown_all().await; diff --git a/crates/orchestrator/src/manager.rs b/crates/orchestrator/src/manager.rs index 031756b4..262e2837 100644 --- a/crates/orchestrator/src/manager.rs +++ b/crates/orchestrator/src/manager.rs @@ -157,50 +157,105 @@ impl AgentManager { Ok(agent) } - /// Reconcile DB state with actual tmux sessions and WebSocket connections on startup. + /// Reconcile DB state with actual backend sessions and WebSocket connections on startup. /// - /// Handles three cases for agents marked as `Running`: + /// Handles agents marked as `Running` against the actual backend state: /// - /// 1. **tmux session is gone** — the process died unexpectedly. Mark the - /// agent as `Failed`. + /// 1. **Session is gone** — the process/container died unexpectedly. + /// Check exit info to determine status: exit code 0 → `Stopped`, + /// non-zero or unknown → `Failed`. /// - /// 2. **tmux session is alive but agent is not connected to the registry** — + /// 2. **Session is alive but agent is not connected to the registry** — /// the orchestrator was restarted and the in-memory `ConnectionRegistry` - /// was reset. The Claude process is still running in tmux but holds a - /// stale WebSocket connection to the old orchestrator instance. Kill the - /// session and re-launch Claude so it establishes a fresh connection. + /// was reset. The Claude process is still running but holds a stale + /// WebSocket connection. Kill the session and re-launch so it + /// establishes a fresh connection. /// - /// 3. **tmux session is alive and agent is connected** — everything is fine, + /// 3. **Session is alive and agent is connected** — everything is fine, /// nothing to do. + /// + /// After handling known agents, cleans up any orphaned backend sessions + /// (containers/tmux sessions with the correct prefix but no matching + /// DB record). pub async fn reconcile(&self) -> anyhow::Result<()> { let agents = self.storage.list(Some(AgentStatus::Running)).await?; + let mut known_sessions: std::collections::HashSet = std::collections::HashSet::new(); + + for agent in &agents { + if let Some(ref s) = agent.session_id { + known_sessions.insert(s.clone()); + } + } for agent in agents { - let session_alive = match agent.session_id.as_ref() { - Some(s) => self.backend.session_exists(s).await.unwrap_or(false), - None => false, + let session_name = match agent.session_id.clone() { + Some(s) => s, + None => { + // No session ID — mark as Failed. + let mut agent = agent; + warn!(agent_id = %agent.id, "Agent marked running but has no session ID, marking failed"); + agent.status = AgentStatus::Failed; + agent.updated_at = Utc::now(); + let _ = self.storage.update(&agent).await; + continue; + } }; + let session_alive = self.backend.session_exists(&session_name).await.unwrap_or(false); + if !session_alive { - // Case 1: tmux session is gone — mark as Failed. + // Case 1: session is gone — check exit info for diagnostics. let mut agent = agent; - warn!( - agent_id = %agent.id, - "Agent marked running but tmux session is gone, marking failed" - ); - agent.status = AgentStatus::Failed; + let exit_info = self.backend.session_exit_info(&session_name).await.ok().flatten(); + + let new_status = match &exit_info { + Some(info) if info.exit_code == 0 => { + info!( + agent_id = %agent.id, + session = %session_name, + "Agent session exited cleanly (exit code 0), marking stopped" + ); + AgentStatus::Stopped + } + Some(info) => { + warn!( + agent_id = %agent.id, + session = %session_name, + exit_code = info.exit_code, + error = ?info.error, + "Agent session exited with error, marking failed" + ); + AgentStatus::Failed + } + None => { + warn!( + agent_id = %agent.id, + session = %session_name, + "Agent marked running but session is gone, marking failed" + ); + AgentStatus::Failed + } + }; + + agent.status = new_status; agent.updated_at = Utc::now(); if let Err(e) = self.storage.update(&agent).await { error!(agent_id = %agent.id, %e, "Failed to update agent status"); } } else if !self.registry.is_connected(&agent.id).await { - // Case 2: tmux session alive but WebSocket connection is stale - // (orchestrator was restarted and ConnectionRegistry was reset). - // Kill the old Claude process and relaunch so it reconnects. + // Case 2: session alive but WebSocket connection is stale. + // Check health before restarting. + let health = self.backend.session_health(&session_name).await.unwrap_or( + wrap::backend::SessionHealth::Unknown, + ); + warn!( agent_id = %agent.id, - "Agent has live tmux session but is not connected to registry after startup, restarting" + session = %session_name, + health = %health, + "Agent has live session but is not connected to registry, restarting" ); + if let Err(e) = self.restart_agent(&agent).await { error!(agent_id = %agent.id, %e, "Failed to restart stale agent during reconcile"); } @@ -208,9 +263,42 @@ impl AgentManager { // Case 3: alive and connected — nothing to do. } + // Clean up orphaned backend sessions (sessions with our prefix but + // no matching DB record). + self.cleanup_orphaned_sessions(&known_sessions).await; + Ok(()) } + /// Remove backend sessions that are labeled with this backend's prefix + /// but have no corresponding agent record in the database. + async fn cleanup_orphaned_sessions( + &self, + known_sessions: &std::collections::HashSet, + ) { + let backend_sessions = match self.backend.list_sessions().await { + Ok(s) => s, + Err(e) => { + warn!(%e, "Failed to list backend sessions for orphan cleanup"); + return; + } + }; + + for session in backend_sessions { + if !known_sessions.contains(&session) { + warn!( + session = %session, + "Found orphaned backend session with no DB record, removing" + ); + if let Err(e) = self.backend.kill_session(&session).await { + error!(session = %session, %e, "Failed to clean up orphaned session"); + } else { + info!(session = %session, "Orphaned session cleaned up"); + } + } + } + } + /// Get an agent by ID (delegates to storage). pub async fn get_agent(&self, id: &Uuid) -> anyhow::Result> { self.storage.get(id).await @@ -324,6 +412,45 @@ impl AgentManager { self.storage.get_usage_stats(id).await } + /// Graceful shutdown: stop all managed agent sessions. + /// + /// Iterates over all running agents, marks them as `Stopped` in the + /// database, then delegates to the backend's `shutdown_all_sessions` + /// to clean up the actual processes/containers. + /// + /// The `leave_running` flag controls whether backend sessions are + /// actually stopped or left running for reconnection on restart: + /// - `false` (default): stop all sessions + /// - `true`: only update DB status, leave sessions running + pub async fn shutdown_all(&self, leave_running: bool) { + info!(leave_running, "Shutting down all managed agents"); + + // Update all running agents to Stopped in the database. + let agents = match self.storage.list(Some(AgentStatus::Running)).await { + Ok(a) => a, + Err(e) => { + error!(%e, "Failed to list running agents during shutdown"); + return; + } + }; + + for mut agent in agents { + agent.status = AgentStatus::Stopped; + agent.updated_at = Utc::now(); + if let Err(e) = self.storage.update(&agent).await { + error!(agent_id = %agent.id, %e, "Failed to update agent status during shutdown"); + } + } + + if !leave_running { + if let Err(e) = self.backend.shutdown_all_sessions().await { + error!(%e, "Failed to shut down backend sessions"); + } + } else { + info!("Leaving backend sessions running for reconnection on restart"); + } + } + /// Restart a running agent: kill the current session and re-launch Claude. /// /// Preserves the agent's ID, name, and config. The prompt is NOT re-sent diff --git a/crates/wrap/src/backend.rs b/crates/wrap/src/backend.rs index ae4e7be6..7e548a8d 100644 --- a/crates/wrap/src/backend.rs +++ b/crates/wrap/src/backend.rs @@ -39,6 +39,47 @@ use crate::tmux::TmuxManager; use crate::types::TmuxLayout; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +/// Health status of a backend session (container or tmux session). +/// +/// Used by the orchestrator to make reconciliation decisions based on +/// the liveness of the underlying execution environment. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SessionHealth { + /// Session is running and healthy (Docker: health check passing or no health check configured). + Healthy, + /// Session is running but health check is failing. + Unhealthy, + /// Session is starting up (Docker: health check hasn't passed yet). + Starting, + /// Health status cannot be determined. + Unknown, +} + +impl std::fmt::Display for SessionHealth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SessionHealth::Healthy => write!(f, "healthy"), + SessionHealth::Unhealthy => write!(f, "unhealthy"), + SessionHealth::Starting => write!(f, "starting"), + SessionHealth::Unknown => write!(f, "unknown"), + } + } +} + +/// Exit information for a session that has terminated. +/// +/// Provides the exit code and an optional error message for diagnosing +/// agent failures. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SessionExitInfo { + /// Process exit code. 0 typically means success, non-zero means failure. + pub exit_code: i64, + /// Optional error message from the runtime (e.g., OOMKilled). + pub error: Option, +} /// Configuration for creating and launching an agent session. /// @@ -126,6 +167,40 @@ pub trait ExecutionBackend: Send + Sync { fn agent_ws_url(&self, _session_name: &str, _config: Option<&SessionConfig>) -> Option { None } + + /// Returns the health status of a session. + /// + /// For Docker backends this inspects the container's health check status. + /// For tmux backends (or backends without health checks), this returns + /// [`SessionHealth::Unknown`] by default. + async fn session_health(&self, _session_name: &str) -> anyhow::Result { + Ok(SessionHealth::Unknown) + } + + /// Returns the exit information for a terminated session. + /// + /// This is used during reconciliation to distinguish between clean exits + /// (exit code 0 → `Stopped`) and failures (non-zero → `Failed`). + /// + /// Returns `None` if the session is still running, was never created, or + /// the backend does not support exit code retrieval. + async fn session_exit_info(&self, _session_name: &str) -> anyhow::Result> { + Ok(None) + } + + /// Stops all sessions managed by this backend. + /// + /// Used during graceful shutdown to clean up all running sessions. + /// The default implementation lists all sessions and kills them individually. + async fn shutdown_all_sessions(&self) -> anyhow::Result<()> { + let sessions = self.list_sessions().await?; + for session in sessions { + if let Err(e) = self.kill_session(&session).await { + tracing::warn!(session = %session, %e, "Failed to kill session during shutdown"); + } + } + Ok(()) + } } // --------------------------------------------------------------------------- @@ -390,4 +465,84 @@ mod tests { fn _assert_send_sync() {} _assert_send_sync::(); } + + // -- SessionHealth tests -- + + #[test] + fn session_health_display() { + assert_eq!(SessionHealth::Healthy.to_string(), "healthy"); + assert_eq!(SessionHealth::Unhealthy.to_string(), "unhealthy"); + assert_eq!(SessionHealth::Starting.to_string(), "starting"); + assert_eq!(SessionHealth::Unknown.to_string(), "unknown"); + } + + #[test] + fn session_health_serde_roundtrip() { + for health in [ + SessionHealth::Healthy, + SessionHealth::Unhealthy, + SessionHealth::Starting, + SessionHealth::Unknown, + ] { + let json = serde_json::to_string(&health).unwrap(); + let deserialized: SessionHealth = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized, health); + } + } + + #[test] + fn session_health_serde_values() { + assert_eq!(serde_json::to_string(&SessionHealth::Healthy).unwrap(), "\"healthy\""); + assert_eq!(serde_json::to_string(&SessionHealth::Unhealthy).unwrap(), "\"unhealthy\""); + assert_eq!(serde_json::to_string(&SessionHealth::Starting).unwrap(), "\"starting\""); + assert_eq!(serde_json::to_string(&SessionHealth::Unknown).unwrap(), "\"unknown\""); + } + + // -- SessionExitInfo tests -- + + #[test] + fn session_exit_info_success() { + let info = SessionExitInfo { exit_code: 0, error: None }; + assert_eq!(info.exit_code, 0); + assert!(info.error.is_none()); + } + + #[test] + fn session_exit_info_failure() { + let info = SessionExitInfo { exit_code: 137, error: Some("OOMKilled".to_string()) }; + assert_eq!(info.exit_code, 137); + assert_eq!(info.error.as_deref(), Some("OOMKilled")); + } + + #[test] + fn session_exit_info_serde_roundtrip() { + let info = SessionExitInfo { exit_code: 1, error: Some("segfault".to_string()) }; + let json = serde_json::to_string(&info).unwrap(); + let deserialized: SessionExitInfo = serde_json::from_str(&json).unwrap(); + assert_eq!(deserialized, info); + } + + #[test] + fn session_exit_info_clone() { + let info = SessionExitInfo { exit_code: 42, error: None }; + let cloned = info.clone(); + assert_eq!(cloned.exit_code, 42); + assert!(cloned.error.is_none()); + } + + // -- Default trait method tests -- + + #[tokio::test] + async fn tmux_backend_session_health_returns_unknown() { + let backend = TmuxBackend::new("test"); + let health = backend.session_health("nonexistent").await.unwrap(); + assert_eq!(health, SessionHealth::Unknown); + } + + #[tokio::test] + async fn tmux_backend_session_exit_info_returns_none() { + let backend = TmuxBackend::new("test"); + let exit_info = backend.session_exit_info("nonexistent").await.unwrap(); + assert!(exit_info.is_none()); + } } diff --git a/crates/wrap/src/docker.rs b/crates/wrap/src/docker.rs index bc9f869d..08776f74 100644 --- a/crates/wrap/src/docker.rs +++ b/crates/wrap/src/docker.rs @@ -37,7 +37,7 @@ //! # } //! ``` -use crate::backend::{ExecutionBackend, SessionConfig}; +use crate::backend::{ExecutionBackend, SessionConfig, SessionExitInfo, SessionHealth}; use async_trait::async_trait; use bollard::container::{ Config, CreateContainerOptions, ListContainersOptions, RemoveContainerOptions, @@ -306,6 +306,29 @@ impl DockerBackend { self.orchestrator_port } + /// Returns the container state status string for logging/diagnostics. + /// + /// Useful for reconciliation to understand why a container is not running + /// (e.g., exited, paused, restarting, dead). + pub async fn container_state(&self, session_name: &str) -> anyhow::Result> { + match self.docker.inspect_container(session_name, None).await { + Ok(info) => { + let status = info + .state + .as_ref() + .and_then(|s| s.status.as_ref()) + .map(|s| format!("{:?}", s).to_lowercase()); + Ok(status) + } + Err(e) if is_not_found(&e) => Ok(None), + Err(e) => Err(anyhow::anyhow!( + "Failed to inspect container '{}': {}", + session_name, + e + )), + } + } + /// Extract the agent ID from a session name. /// /// Session names follow the pattern `{prefix}-{agent_id}`. This @@ -672,6 +695,126 @@ impl ExecutionBackend for DockerBackend { } } } + + /// Returns the health status of a Docker container. + /// + /// Maps Docker's container health status to [`SessionHealth`]: + /// - `healthy` → [`SessionHealth::Healthy`] + /// - `unhealthy` → [`SessionHealth::Unhealthy`] + /// - `starting` → [`SessionHealth::Starting`] + /// - No health check configured but container running → [`SessionHealth::Healthy`] + /// - Container not running → [`SessionHealth::Unknown`] + async fn session_health(&self, session_name: &str) -> anyhow::Result { + match self.docker.inspect_container(session_name, None).await { + Ok(info) => { + let state = match info.state.as_ref() { + Some(s) => s, + None => return Ok(SessionHealth::Unknown), + }; + + // If the container is not running, health is unknown. + if !state.running.unwrap_or(false) { + return Ok(SessionHealth::Unknown); + } + + // Check the Docker HEALTHCHECK status if present. + match state.health.as_ref().and_then(|h| h.status.as_ref()) { + Some(status) => { + let health_str = format!("{:?}", status).to_lowercase(); + if health_str.contains("healthy") && !health_str.contains("unhealthy") { + Ok(SessionHealth::Healthy) + } else if health_str.contains("unhealthy") { + Ok(SessionHealth::Unhealthy) + } else if health_str.contains("starting") { + Ok(SessionHealth::Starting) + } else { + Ok(SessionHealth::Unknown) + } + } + // No HEALTHCHECK configured — if running, assume healthy. + None => Ok(SessionHealth::Healthy), + } + } + Err(e) if is_not_found(&e) => Ok(SessionHealth::Unknown), + Err(e) => Err(anyhow::anyhow!( + "Failed to inspect container '{}' for health: {}", + session_name, + e + )), + } + } + + /// Returns exit information for a terminated Docker container. + /// + /// Inspects the container state to retrieve the exit code and any error + /// message (e.g., OOMKilled). Returns `None` if the container is still + /// running or does not exist. + async fn session_exit_info( + &self, + session_name: &str, + ) -> anyhow::Result> { + match self.docker.inspect_container(session_name, None).await { + Ok(info) => { + let state = match info.state.as_ref() { + Some(s) => s, + None => return Ok(None), + }; + + // Only return exit info for containers that have stopped. + if state.running.unwrap_or(false) { + return Ok(None); + } + + let exit_code = state.exit_code.unwrap_or(-1); + let mut error = state.error.clone().filter(|e| !e.is_empty()); + + // Check for OOM kill. + if state.oom_killed.unwrap_or(false) { + error = Some( + error + .map(|e| format!("OOMKilled: {}", e)) + .unwrap_or_else(|| "OOMKilled".to_string()), + ); + } + + Ok(Some(SessionExitInfo { exit_code, error })) + } + Err(e) if is_not_found(&e) => Ok(None), + Err(e) => Err(anyhow::anyhow!( + "Failed to inspect container '{}' for exit info: {}", + session_name, + e + )), + } + } + + /// Stops all containers managed by this Docker backend. + /// + /// Lists all containers with the matching prefix label and stops + removes + /// each one. Logs events for observability. + async fn shutdown_all_sessions(&self) -> anyhow::Result<()> { + let sessions = self.list_sessions().await?; + let count = sessions.len(); + + if count == 0 { + debug!(prefix = %self.prefix, "No Docker containers to shut down"); + return Ok(()); + } + + info!(count, prefix = %self.prefix, "Shutting down Docker containers"); + + for session in &sessions { + info!(session = %session, event = "stop", "Stopping Docker container"); + if let Err(e) = self.kill_session(session).await { + warn!(session = %session, %e, "Failed to stop container during shutdown"); + } else { + info!(session = %session, event = "removed", "Docker container removed"); + } + } + + info!(count, "Docker container shutdown complete"); + Ok(()) + } } // --------------------------------------------------------------------------- @@ -1052,4 +1195,40 @@ mod tests { let url_default = backend.agent_ws_url("test-prefix-abc123", None); assert_eq!(url_default, Some("ws://host.docker.internal:7006/ws/abc123".to_string())); } + + // -- container_state -- + + #[test] + fn container_state_method_exists() { + // Verify that container_state is callable (actual Docker tests require a daemon). + let backend = test_backend(); + let _ = &backend; // Ensure the method exists on DockerBackend. + } + + // -- SessionExitInfo construction -- + + #[test] + fn session_exit_info_zero_exit() { + let info = SessionExitInfo { exit_code: 0, error: None }; + assert_eq!(info.exit_code, 0); + assert!(info.error.is_none()); + } + + #[test] + fn session_exit_info_oom_killed() { + let info = SessionExitInfo { exit_code: 137, error: Some("OOMKilled".to_string()) }; + assert_eq!(info.exit_code, 137); + assert_eq!(info.error.as_deref(), Some("OOMKilled")); + } + + // -- SessionHealth values from Docker -- + + #[test] + fn session_health_variants() { + // Verify that all SessionHealth variants are accessible from this module. + let _ = SessionHealth::Healthy; + let _ = SessionHealth::Unhealthy; + let _ = SessionHealth::Starting; + let _ = SessionHealth::Unknown; + } } diff --git a/crates/wrap/src/lib.rs b/crates/wrap/src/lib.rs index 5b9c1f00..9c6bd52b 100644 --- a/crates/wrap/src/lib.rs +++ b/crates/wrap/src/lib.rs @@ -35,7 +35,7 @@ pub mod docker; pub mod tmux; pub mod types; -pub use backend::{ExecutionBackend, SessionConfig, TmuxBackend}; +pub use backend::{ExecutionBackend, SessionConfig, SessionExitInfo, SessionHealth, TmuxBackend}; pub use client::WrapClient; pub use docker::{DockerBackend, NetworkPolicy}; pub use types::{ diff --git a/docker/claude-code/Dockerfile b/docker/claude-code/Dockerfile index fa911f70..fab660ef 100644 --- a/docker/claude-code/Dockerfile +++ b/docker/claude-code/Dockerfile @@ -67,4 +67,10 @@ USER agent # Verify the CLI is installed and accessible. RUN claude --version +# ── Health check ──────────────────────────────────────────────────── +# Docker HEALTHCHECK used by the orchestrator to detect container liveness. +# Checks that the claude CLI binary is still accessible and functional. +HEALTHCHECK --interval=30s --timeout=5s --retries=3 \ + CMD claude --version || exit 1 + CMD ["claude"] From ae07a969f4a262898c1cb75a92af727bbd91bf48 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Wed, 11 Mar 2026 14:27:43 -0700 Subject: [PATCH 2/4] fix: run cargo fmt --- crates/cli/src/commands/apply.rs | 6 +++++- crates/cli/src/commands/orchestrator.rs | 19 +++---------------- crates/orchestrator/src/main.rs | 10 +++------- crates/orchestrator/src/manager.rs | 16 ++++++++-------- crates/orchestrator/src/storage.rs | 21 +++++++++++---------- crates/wrap/src/backend.rs | 5 ++++- crates/wrap/src/docker.rs | 6 +----- 7 files changed, 35 insertions(+), 48 deletions(-) diff --git a/crates/cli/src/commands/apply.rs b/crates/cli/src/commands/apply.rs index 090f8bb1..cca67cf0 100644 --- a/crates/cli/src/commands/apply.rs +++ b/crates/cli/src/commands/apply.rs @@ -552,7 +552,11 @@ async fn apply_agent( auto_clear_threshold: tmpl.auto_clear_threshold, network_policy: parsed_network_policy, docker_image: tmpl.docker_image.clone(), - extra_mounts: if tmpl.extra_mounts.is_empty() { None } else { Some(tmpl.extra_mounts.clone()) }, + extra_mounts: if tmpl.extra_mounts.is_empty() { + None + } else { + Some(tmpl.extra_mounts.clone()) + }, resource_limits: tmpl.resource_limits.clone(), }; diff --git a/crates/cli/src/commands/orchestrator.rs b/crates/cli/src/commands/orchestrator.rs index c18d544c..96da6262 100644 --- a/crates/cli/src/commands/orchestrator.rs +++ b/crates/cli/src/commands/orchestrator.rs @@ -965,10 +965,7 @@ async fn create_agent( // Build resource limits from individual flags. let resource_limits = if cpu_limit.is_some() || memory_limit.is_some() { - Some(orchestrator::types::ResourceLimits { - cpu_limit, - memory_limit_mb: memory_limit, - }) + Some(orchestrator::types::ResourceLimits { cpu_limit, memory_limit_mb: memory_limit }) } else { None }; @@ -2179,13 +2176,7 @@ fn display_agent(agent: &AgentResponse) { if let Some(ref mounts) = agent.config.extra_mounts { for mount in mounts { let ro = if mount.read_only { ":ro" } else { "" }; - println!( - "{}: {}:{}{}", - "Mount".bold(), - mount.host_path, - mount.container_path, - ro - ); + println!("{}: {}:{}{}", "Mount".bold(), mount.host_path, mount.container_path, ro); } } if let Some(ref policy) = agent.config.network_policy { @@ -3077,11 +3068,7 @@ mod tests { #[test] fn test_parse_mount_flags_multiple() { - let mounts = vec![ - "/a:/b".to_string(), - "/c:/d:ro".to_string(), - "/e:/f:rw".to_string(), - ]; + let mounts = vec!["/a:/b".to_string(), "/c:/d:ro".to_string(), "/e:/f:rw".to_string()]; let result = parse_mount_flags(&mounts).unwrap(); assert_eq!(result.len(), 3); assert!(!result[0].read_only); diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index f8338f6c..08e0d6db 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -86,10 +86,7 @@ async fn main() -> anyhow::Result<()> { Arc::new(docker_backend) } other => { - anyhow::bail!( - "Unknown AGENTD_BACKEND value '{}'. Valid options: tmux, docker", - other - ); + anyhow::bail!("Unknown AGENTD_BACKEND value '{}'. Valid options: tmux, docker", other); } }; @@ -266,9 +263,8 @@ async fn main() -> anyhow::Result<()> { // Graceful shutdown: stop all managed agent sessions. // AGENTD_SHUTDOWN_LEAVE_RUNNING=true leaves sessions alive for reconnection. - let leave_running = env::var("AGENTD_SHUTDOWN_LEAVE_RUNNING") - .map(|v| v == "true" || v == "1") - .unwrap_or(false); + let leave_running = + env::var("AGENTD_SHUTDOWN_LEAVE_RUNNING").map(|v| v == "true" || v == "1").unwrap_or(false); manager.shutdown_all(leave_running).await; // Graceful shutdown: stop all workflow runners. diff --git a/crates/orchestrator/src/manager.rs b/crates/orchestrator/src/manager.rs index 262e2837..0495d281 100644 --- a/crates/orchestrator/src/manager.rs +++ b/crates/orchestrator/src/manager.rs @@ -179,7 +179,8 @@ impl AgentManager { /// DB record). pub async fn reconcile(&self) -> anyhow::Result<()> { let agents = self.storage.list(Some(AgentStatus::Running)).await?; - let mut known_sessions: std::collections::HashSet = std::collections::HashSet::new(); + let mut known_sessions: std::collections::HashSet = + std::collections::HashSet::new(); for agent in &agents { if let Some(ref s) = agent.session_id { @@ -245,9 +246,11 @@ impl AgentManager { } else if !self.registry.is_connected(&agent.id).await { // Case 2: session alive but WebSocket connection is stale. // Check health before restarting. - let health = self.backend.session_health(&session_name).await.unwrap_or( - wrap::backend::SessionHealth::Unknown, - ); + let health = self + .backend + .session_health(&session_name) + .await + .unwrap_or(wrap::backend::SessionHealth::Unknown); warn!( agent_id = %agent.id, @@ -272,10 +275,7 @@ impl AgentManager { /// Remove backend sessions that are labeled with this backend's prefix /// but have no corresponding agent record in the database. - async fn cleanup_orphaned_sessions( - &self, - known_sessions: &std::collections::HashSet, - ) { + async fn cleanup_orphaned_sessions(&self, known_sessions: &std::collections::HashSet) { let backend_sessions = match self.backend.list_sessions().await { Ok(s) => s, Err(e) => { diff --git a/crates/orchestrator/src/storage.rs b/crates/orchestrator/src/storage.rs index 3fca2103..0224e95c 100644 --- a/crates/orchestrator/src/storage.rs +++ b/crates/orchestrator/src/storage.rs @@ -86,12 +86,16 @@ impl AgentStorage { auto_clear_threshold: Set(agent.config.auto_clear_threshold.map(|v| v as i64)), network_policy: Set(agent.config.network_policy.as_ref().map(|p| p.to_string())), docker_image: Set(agent.config.docker_image.clone()), - extra_mounts: Set(agent.config.extra_mounts.as_ref().map(|m| { - serde_json::to_string(m).unwrap_or_else(|_| "[]".to_string()) - })), - resource_limits: Set(agent.config.resource_limits.as_ref().map(|r| { - serde_json::to_string(r).unwrap_or_else(|_| "{}".to_string()) - })), + extra_mounts: Set(agent + .config + .extra_mounts + .as_ref() + .map(|m| serde_json::to_string(m).unwrap_or_else(|_| "[]".to_string()))), + resource_limits: Set(agent + .config + .resource_limits + .as_ref() + .map(|r| serde_json::to_string(r).unwrap_or_else(|_| "{}".to_string()))), }; agent_entity::Entity::insert(model).exec(&self.db).await?; @@ -510,10 +514,7 @@ fn model_to_agent(model: agent_entity::Model) -> Result { .transpose() .unwrap_or(None), docker_image: model.docker_image, - extra_mounts: model - .extra_mounts - .as_deref() - .and_then(|s| serde_json::from_str(s).ok()), + extra_mounts: model.extra_mounts.as_deref().and_then(|s| serde_json::from_str(s).ok()), resource_limits: model .resource_limits .as_deref() diff --git a/crates/wrap/src/backend.rs b/crates/wrap/src/backend.rs index 7e548a8d..9cc8bb4e 100644 --- a/crates/wrap/src/backend.rs +++ b/crates/wrap/src/backend.rs @@ -184,7 +184,10 @@ pub trait ExecutionBackend: Send + Sync { /// /// Returns `None` if the session is still running, was never created, or /// the backend does not support exit code retrieval. - async fn session_exit_info(&self, _session_name: &str) -> anyhow::Result> { + async fn session_exit_info( + &self, + _session_name: &str, + ) -> anyhow::Result> { Ok(None) } diff --git a/crates/wrap/src/docker.rs b/crates/wrap/src/docker.rs index 08776f74..339e4fcd 100644 --- a/crates/wrap/src/docker.rs +++ b/crates/wrap/src/docker.rs @@ -321,11 +321,7 @@ impl DockerBackend { Ok(status) } Err(e) if is_not_found(&e) => Ok(None), - Err(e) => Err(anyhow::anyhow!( - "Failed to inspect container '{}': {}", - session_name, - e - )), + Err(e) => Err(anyhow::anyhow!("Failed to inspect container '{}': {}", session_name, e)), } } From 0d0f60ad97ee527bd107c2e467753220ab734408 Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Wed, 11 Mar 2026 14:58:32 -0700 Subject: [PATCH 3/4] fix: address PR #324 code review feedback - Use bollard HealthStatusEnum variant matching instead of fragile Debug string formatting in session_health() - Use Display (to_string) instead of Debug format in container_state() - Add --start-period=10s to Dockerfile HEALTHCHECK for startup grace - Remove useless container_state_method_exists test - Clarify that health check in reconcile is for observability only, not gating the restart decision Co-Authored-By: Claude Opus 4.6 --- crates/orchestrator/src/manager.rs | 4 +++- crates/wrap/src/docker.rs | 32 +++++++++--------------------- docker/claude-code/Dockerfile | 2 +- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/crates/orchestrator/src/manager.rs b/crates/orchestrator/src/manager.rs index 0495d281..d616939c 100644 --- a/crates/orchestrator/src/manager.rs +++ b/crates/orchestrator/src/manager.rs @@ -245,7 +245,9 @@ impl AgentManager { } } else if !self.registry.is_connected(&agent.id).await { // Case 2: session alive but WebSocket connection is stale. - // Check health before restarting. + // Fetch health for observability/logging only — the restart is + // unconditional because the stale WebSocket must be replaced + // regardless of container health status. let health = self .backend .session_health(&session_name) diff --git a/crates/wrap/src/docker.rs b/crates/wrap/src/docker.rs index 339e4fcd..c0498efe 100644 --- a/crates/wrap/src/docker.rs +++ b/crates/wrap/src/docker.rs @@ -44,7 +44,7 @@ use bollard::container::{ StartContainerOptions, StopContainerOptions, }; use bollard::exec::CreateExecOptions; -use bollard::models::ContainerStateStatusEnum; +use bollard::models::{ContainerStateStatusEnum, HealthStatusEnum}; use bollard::Docker; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -317,7 +317,7 @@ impl DockerBackend { .state .as_ref() .and_then(|s| s.status.as_ref()) - .map(|s| format!("{:?}", s).to_lowercase()); + .map(|s| s.to_string()); Ok(status) } Err(e) if is_not_found(&e) => Ok(None), @@ -714,19 +714,14 @@ impl ExecutionBackend for DockerBackend { } // Check the Docker HEALTHCHECK status if present. + // Match on bollard's `HealthStatusEnum` variants directly + // rather than using Debug formatting, which would be fragile + // across bollard version bumps. match state.health.as_ref().and_then(|h| h.status.as_ref()) { - Some(status) => { - let health_str = format!("{:?}", status).to_lowercase(); - if health_str.contains("healthy") && !health_str.contains("unhealthy") { - Ok(SessionHealth::Healthy) - } else if health_str.contains("unhealthy") { - Ok(SessionHealth::Unhealthy) - } else if health_str.contains("starting") { - Ok(SessionHealth::Starting) - } else { - Ok(SessionHealth::Unknown) - } - } + Some(HealthStatusEnum::HEALTHY) => Ok(SessionHealth::Healthy), + Some(HealthStatusEnum::UNHEALTHY) => Ok(SessionHealth::Unhealthy), + Some(HealthStatusEnum::STARTING) => Ok(SessionHealth::Starting), + Some(_) => Ok(SessionHealth::Unknown), // No HEALTHCHECK configured — if running, assume healthy. None => Ok(SessionHealth::Healthy), } @@ -1192,15 +1187,6 @@ mod tests { assert_eq!(url_default, Some("ws://host.docker.internal:7006/ws/abc123".to_string())); } - // -- container_state -- - - #[test] - fn container_state_method_exists() { - // Verify that container_state is callable (actual Docker tests require a daemon). - let backend = test_backend(); - let _ = &backend; // Ensure the method exists on DockerBackend. - } - // -- SessionExitInfo construction -- #[test] diff --git a/docker/claude-code/Dockerfile b/docker/claude-code/Dockerfile index fab660ef..066883fe 100644 --- a/docker/claude-code/Dockerfile +++ b/docker/claude-code/Dockerfile @@ -70,7 +70,7 @@ RUN claude --version # ── Health check ──────────────────────────────────────────────────── # Docker HEALTHCHECK used by the orchestrator to detect container liveness. # Checks that the claude CLI binary is still accessible and functional. -HEALTHCHECK --interval=30s --timeout=5s --retries=3 \ +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ CMD claude --version || exit 1 CMD ["claude"] From cac373699bb5c099f452fc28640326e9c1384d0f Mon Sep 17 00:00:00 2001 From: Geoff Johnson Date: Wed, 11 Mar 2026 15:07:46 -0700 Subject: [PATCH 4/4] fix: run cargo fmt --- crates/wrap/src/docker.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/wrap/src/docker.rs b/crates/wrap/src/docker.rs index c0498efe..875411fc 100644 --- a/crates/wrap/src/docker.rs +++ b/crates/wrap/src/docker.rs @@ -313,11 +313,8 @@ impl DockerBackend { pub async fn container_state(&self, session_name: &str) -> anyhow::Result> { match self.docker.inspect_container(session_name, None).await { Ok(info) => { - let status = info - .state - .as_ref() - .and_then(|s| s.status.as_ref()) - .map(|s| s.to_string()); + let status = + info.state.as_ref().and_then(|s| s.status.as_ref()).map(|s| s.to_string()); Ok(status) } Err(e) if is_not_found(&e) => Ok(None),