Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/communicate/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ async fn handle_socket(socket: WebSocket, state: crate::api::ApiState, params: W
}
});

subscription_tasks.insert(room_id, task);
// Abort any existing subscription task for this room
// to prevent duplicate message delivery on resubscribe.
if let Some(old_task) = subscription_tasks.insert(room_id, task) {
old_task.abort();
}
}
Err(e) => {
warn!(%conn_id, %room_id, error = %e, "Subscribe failed");
Expand Down
31 changes: 30 additions & 1 deletion crates/orchestrator/src/message_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use communicate::types::{
};
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
Expand All @@ -73,6 +73,8 @@ const META_SOURCE_KEY: &str = "source";
const META_SOURCE_VALUE: &str = "agent_response";
/// Metadata key carrying the originating agent ID on response messages.
const META_AGENT_ID_KEY: &str = "agent_id";
/// Maximum number of seen message IDs to retain for deduplication.
const MAX_SEEN_MESSAGES: usize = 1000;
/// Initial backoff duration for WebSocket reconnection attempts.
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
/// Maximum backoff duration for WebSocket reconnection attempts.
Expand Down Expand Up @@ -168,6 +170,11 @@ pub struct MessageBridge {

/// WS base URL for the communicate service (scheme already converted to ws/wss).
ws_url: String,

/// Set of recently seen message IDs for deduplication.
/// Prevents the same message from being delivered to agents multiple times
/// if the WebSocket delivers duplicate events (e.g. after reconnection).
seen_messages: Arc<RwLock<HashSet<Uuid>>>,
}

impl MessageBridge {
Expand Down Expand Up @@ -218,6 +225,7 @@ impl MessageBridge {
max_queue_depth: DEFAULT_MAX_QUEUE_DEPTH,
ws_tx: Arc::new(Mutex::new(None)),
ws_url,
seen_messages: Arc::new(RwLock::new(HashSet::new())),
}
}

Expand Down Expand Up @@ -505,6 +513,27 @@ impl MessageBridge {
// -----------------------------------------------------------------------

async fn on_room_message(&self, room_id: Uuid, message: communicate::types::MessageResponse) {
// Deduplication: skip messages we have already processed.
// This guards against duplicate WebSocket deliveries after reconnection.
{
let mut seen = self.seen_messages.write().await;
if !seen.insert(message.id) {
debug!(
%room_id,
message_id = %message.id,
"MessageBridge: skipping duplicate message"
);
return;
}
// Evict oldest entries when the set grows too large.
if seen.len() > MAX_SEEN_MESSAGES {
// HashSet doesn't have ordered eviction, so clear and accept
// a brief window where very old duplicates could sneak through.
seen.clear();
seen.insert(message.id);
}
}

// Echo prevention: skip messages posted by the bridge itself.
if message.metadata.get(META_SOURCE_KEY).map(|s| s.as_str()) == Some(META_SOURCE_VALUE) {
debug!(
Expand Down
51 changes: 47 additions & 4 deletions docs/public/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ See [Installation Guide](install.md) for detailed setup instructions.

### Port Reference

agentd uses a dual-port scheme: **dev ports (17xxx)** when running with `cargo run`, and **production ports (7xxx)** when installed as a LaunchAgent (macOS) or systemd unit (Linux).

| Service | Dev Port | Prod Port |
|---------|----------|-----------|
| agentd-ask | 17001 | 7001 |
Expand All @@ -531,22 +533,63 @@ See [Installation Guide](install.md) for detailed setup instructions.
| agentd-notify | 17004 | 7004 |
| agentd-wrap | 17005 | 7005 |
| agentd-orchestrator | 17006 | 7006 |
| agentd-memory | — | 7008 |
| agentd-communicate | 17010 | 7010 |

The `agent` CLI defaults to **production ports** (7xxx). If your services are running on dev ports, set the URL overrides:

```bash
source .env # sets all AGENTD_*_SERVICE_URL vars to dev ports
```

Or override a single service:

```bash
AGENTD_COMMUNICATE_SERVICE_URL=http://localhost:17010 agent communicate health
```

See [Configuration Reference](configuration.md) for the full list of environment variables.

---

## Troubleshooting

### `agent status` shows services as down when they are running

The `agent status` command checks **production ports (7xxx)** by default. If you're running services with `cargo run` (dev ports 17xxx), status checks will fail even though services are healthy.

Check which ports your services are actually on:

```bash
# Test dev ports directly
curl -s http://localhost:17004/health # notify (dev)
curl -s http://localhost:17006/health # orchestrator (dev)

# Test production ports
curl -s http://localhost:7004/health # notify (prod)
curl -s http://localhost:7006/health # orchestrator (prod)
```

If dev services are healthy but `agent status` shows them down, source the `.env` file to point the CLI at dev ports:

```bash
source .env
agent status
```

See [issue #536](https://github.com/geoffjay/agentd/issues/536) — a code fix is in progress to make `agent status` port-scheme-aware.

### "Connection refused" when hitting health endpoints

The service isn't running. Check:
The service isn't running, or you're checking the wrong port. Check:

```bash
# Is the process running?
ps aux | grep agentd

# Check for port conflicts
lsof -i :17004
lsof -i :17006
# Are services on dev ports (cargo run) or prod ports (installed)?
curl -s http://localhost:17004/health # dev
curl -s http://localhost:7004/health # prod
```

If another process holds the port, either stop it or override with `AGENTD_PORT=18004 cargo run -p agentd-notify`.
Expand Down
19 changes: 16 additions & 3 deletions docs/public/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,18 @@ and a production port (7xxx) when running as a LaunchAgent:
| agentd-notify | 17004 | 7004 |
| agentd-wrap | 17005 | 7005 |
| agentd-orchestrator | 17006 | 7006 |
| agentd-memory | — | 7008 |
| agentd-communicate | 17010 | 7010 |

All ports are configurable via the `AGENTD_PORT` environment variable.

!!! warning "CLI defaults to production ports"
The `agent` CLI connects to **production ports (7xxx)** by default. If you're running services with `cargo run` (dev ports), `agent status` and other commands will report connection failures even though services are healthy. Fix this by sourcing the dev environment:
```bash
source .env # sets AGENTD_*_SERVICE_URL vars to dev ports (17xxx)
```
See [Configuration Reference](configuration.md#using-the-env-file-for-development) for details.

### Custom Installation Location

Use the `PREFIX` environment variable to install to a custom location:
Expand Down Expand Up @@ -320,12 +329,16 @@ Common issues:
cargo xtask service-status
```

2. Check if port is listening:
2. Check if port is listening. Installed services use **production ports (7xxx)**:
```bash
curl http://localhost:17004/health
curl http://localhost:17001/health
curl http://localhost:7004/health # notify
curl http://localhost:7001/health # ask
curl http://localhost:7006/health # orchestrator
curl http://localhost:7010/health # communicate
```

If running with `cargo run` (dev), use dev ports (17xxx) instead.

3. Restart services:
```bash
cargo xtask stop-services
Expand Down
Loading