diff --git a/crates/communicate/src/websocket.rs b/crates/communicate/src/websocket.rs index a3491d11..d9732278 100644 --- a/crates/communicate/src/websocket.rs +++ b/crates/communicate/src/websocket.rs @@ -326,7 +326,11 @@ async fn handle_socket(socket: WebSocket, state: crate::api::ApiState, params: W } }); - subscription_tasks.insert(room_id, task); + // Abort any existing subscription task for this room + // to prevent duplicate message delivery on resubscribe. + if let Some(old_task) = subscription_tasks.insert(room_id, task) { + old_task.abort(); + } } Err(e) => { warn!(%conn_id, %room_id, error = %e, "Subscribe failed"); diff --git a/crates/orchestrator/src/message_bridge.rs b/crates/orchestrator/src/message_bridge.rs index 9db6854a..aa0841df 100644 --- a/crates/orchestrator/src/message_bridge.rs +++ b/crates/orchestrator/src/message_bridge.rs @@ -52,7 +52,7 @@ use communicate::types::{ }; use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; @@ -73,6 +73,8 @@ const META_SOURCE_KEY: &str = "source"; const META_SOURCE_VALUE: &str = "agent_response"; /// Metadata key carrying the originating agent ID on response messages. const META_AGENT_ID_KEY: &str = "agent_id"; +/// Maximum number of seen message IDs to retain for deduplication. +const MAX_SEEN_MESSAGES: usize = 1000; /// Initial backoff duration for WebSocket reconnection attempts. const INITIAL_BACKOFF: Duration = Duration::from_secs(1); /// Maximum backoff duration for WebSocket reconnection attempts. @@ -168,6 +170,11 @@ pub struct MessageBridge { /// WS base URL for the communicate service (scheme already converted to ws/wss). ws_url: String, + + /// Set of recently seen message IDs for deduplication. + /// Prevents the same message from being delivered to agents multiple times + /// if the WebSocket delivers duplicate events (e.g. after reconnection). + seen_messages: Arc>>, } impl MessageBridge { @@ -218,6 +225,7 @@ impl MessageBridge { max_queue_depth: DEFAULT_MAX_QUEUE_DEPTH, ws_tx: Arc::new(Mutex::new(None)), ws_url, + seen_messages: Arc::new(RwLock::new(HashSet::new())), } } @@ -505,6 +513,27 @@ impl MessageBridge { // ----------------------------------------------------------------------- async fn on_room_message(&self, room_id: Uuid, message: communicate::types::MessageResponse) { + // Deduplication: skip messages we have already processed. + // This guards against duplicate WebSocket deliveries after reconnection. + { + let mut seen = self.seen_messages.write().await; + if !seen.insert(message.id) { + debug!( + %room_id, + message_id = %message.id, + "MessageBridge: skipping duplicate message" + ); + return; + } + // Evict oldest entries when the set grows too large. + if seen.len() > MAX_SEEN_MESSAGES { + // HashSet doesn't have ordered eviction, so clear and accept + // a brief window where very old duplicates could sneak through. + seen.clear(); + seen.insert(message.id); + } + } + // Echo prevention: skip messages posted by the bridge itself. if message.metadata.get(META_SOURCE_KEY).map(|s| s.as_str()) == Some(META_SOURCE_VALUE) { debug!( diff --git a/docs/public/getting-started.md b/docs/public/getting-started.md index d1585879..eca69378 100644 --- a/docs/public/getting-started.md +++ b/docs/public/getting-started.md @@ -523,6 +523,8 @@ See [Installation Guide](install.md) for detailed setup instructions. ### Port Reference +agentd uses a dual-port scheme: **dev ports (17xxx)** when running with `cargo run`, and **production ports (7xxx)** when installed as a LaunchAgent (macOS) or systemd unit (Linux). + | Service | Dev Port | Prod Port | |---------|----------|-----------| | agentd-ask | 17001 | 7001 | @@ -531,22 +533,63 @@ See [Installation Guide](install.md) for detailed setup instructions. | agentd-notify | 17004 | 7004 | | agentd-wrap | 17005 | 7005 | | agentd-orchestrator | 17006 | 7006 | +| agentd-memory | — | 7008 | +| agentd-communicate | 17010 | 7010 | + +The `agent` CLI defaults to **production ports** (7xxx). If your services are running on dev ports, set the URL overrides: + +```bash +source .env # sets all AGENTD_*_SERVICE_URL vars to dev ports +``` + +Or override a single service: + +```bash +AGENTD_COMMUNICATE_SERVICE_URL=http://localhost:17010 agent communicate health +``` + +See [Configuration Reference](configuration.md) for the full list of environment variables. --- ## Troubleshooting +### `agent status` shows services as down when they are running + +The `agent status` command checks **production ports (7xxx)** by default. If you're running services with `cargo run` (dev ports 17xxx), status checks will fail even though services are healthy. + +Check which ports your services are actually on: + +```bash +# Test dev ports directly +curl -s http://localhost:17004/health # notify (dev) +curl -s http://localhost:17006/health # orchestrator (dev) + +# Test production ports +curl -s http://localhost:7004/health # notify (prod) +curl -s http://localhost:7006/health # orchestrator (prod) +``` + +If dev services are healthy but `agent status` shows them down, source the `.env` file to point the CLI at dev ports: + +```bash +source .env +agent status +``` + +See [issue #536](https://github.com/geoffjay/agentd/issues/536) — a code fix is in progress to make `agent status` port-scheme-aware. + ### "Connection refused" when hitting health endpoints -The service isn't running. Check: +The service isn't running, or you're checking the wrong port. Check: ```bash # Is the process running? ps aux | grep agentd -# Check for port conflicts -lsof -i :17004 -lsof -i :17006 +# Are services on dev ports (cargo run) or prod ports (installed)? +curl -s http://localhost:17004/health # dev +curl -s http://localhost:7004/health # prod ``` If another process holds the port, either stop it or override with `AGENTD_PORT=18004 cargo run -p agentd-notify`. diff --git a/docs/public/install.md b/docs/public/install.md index 06b6724e..0993fecb 100644 --- a/docs/public/install.md +++ b/docs/public/install.md @@ -222,9 +222,18 @@ and a production port (7xxx) when running as a LaunchAgent: | agentd-notify | 17004 | 7004 | | agentd-wrap | 17005 | 7005 | | agentd-orchestrator | 17006 | 7006 | +| agentd-memory | — | 7008 | +| agentd-communicate | 17010 | 7010 | All ports are configurable via the `AGENTD_PORT` environment variable. +!!! warning "CLI defaults to production ports" + The `agent` CLI connects to **production ports (7xxx)** by default. If you're running services with `cargo run` (dev ports), `agent status` and other commands will report connection failures even though services are healthy. Fix this by sourcing the dev environment: + ```bash + source .env # sets AGENTD_*_SERVICE_URL vars to dev ports (17xxx) + ``` + See [Configuration Reference](configuration.md#using-the-env-file-for-development) for details. + ### Custom Installation Location Use the `PREFIX` environment variable to install to a custom location: @@ -320,12 +329,16 @@ Common issues: cargo xtask service-status ``` -2. Check if port is listening: +2. Check if port is listening. Installed services use **production ports (7xxx)**: ```bash - curl http://localhost:17004/health - curl http://localhost:17001/health + curl http://localhost:7004/health # notify + curl http://localhost:7001/health # ask + curl http://localhost:7006/health # orchestrator + curl http://localhost:7010/health # communicate ``` + If running with `cargo run` (dev), use dev ports (17xxx) instead. + 3. Restart services: ```bash cargo xtask stop-services