Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions src-tauri/src/commands/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,16 @@ fn build_connect_link(dest: &str, ik_hex: &str, spk_hex: &str) -> String {
}

/// Accept loop: waits for incoming I2P stream connections for this session.
/// Implements non-blocking handshakes with timeout and tunnel health monitoring.
async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) {
// Spawn health monitoring task
let app_health = app.clone();
let session_id_health = session_id.clone();
let sam_addr_health = sam_addr.clone();
tauri::async_runtime::spawn(async move {
tunnel_health_monitor(app_health, session_id_health, sam_addr_health).await;
});

loop {
// Exit if the session was replaced or dropped (e.g. after panic_wipe)
let should_continue = {
Expand All @@ -203,11 +212,31 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) {

match accept_once_raw(&session_id, &sam_addr).await {
Ok((peer_dest, tunnel)) => {
if let Err(e) = handle_incoming(&app, peer_dest, tunnel).await {
#[cfg(debug_assertions)]
log::warn!("incoming session error: {}", e);
let _ = e;
}
// Spawn handshake in a separate task with timeout to avoid blocking
let app_clone = app.clone();
tauri::async_runtime::spawn(async move {
// 60s timeout for handshake to prevent indefinite blocking
let result = tokio::time::timeout(
tokio::time::Duration::from_secs(60),
handle_incoming(&app_clone, peer_dest, tunnel)
).await;

match result {
Ok(Ok(())) => {
// Handshake successful
},
Ok(Err(e)) => {
#[cfg(debug_assertions)]
log::warn!("incoming session handshake error: {}", e);
let _ = e;
},
Err(_) => {
#[cfg(debug_assertions)]
log::warn!("incoming session handshake timeout (60s)");
let _ = app_clone.emit("connection_error", "Incoming handshake timeout - peer unresponsive");
}
}
});
}
Err(e) => {
#[cfg(debug_assertions)]
Expand All @@ -218,6 +247,53 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) {
}
}

/// Monitor tunnel health and emit status events.
/// Periodically checks if the SAM session is still responsive.
async fn tunnel_health_monitor(app: AppHandle, session_id: String, sam_addr: String) {
loop {
// Check if session is still valid
let should_continue = {
let state = app.state::<AppState>();
let i2p = state.i2p.lock().await;
i2p.as_ref().map_or(false, |s| s.session_id == session_id)
};
if !should_continue {
break;
}

// Check tunnel health by attempting a quick SAM status check
let is_healthy = check_tunnel_health(&sam_addr).await;

if is_healthy {
let _ = app.emit("tunnel_healthy", ());
} else {
#[cfg(debug_assertions)]
log::warn!("Tunnel health check failed");
let _ = app.emit("tunnel_degraded", ());
}

// Check every 30 seconds
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
}
}

/// Quick health check for the I2P tunnel.
/// Attempts to connect to SAM and verify session is still valid.
async fn check_tunnel_health(sam_addr: &str) -> bool {
match tokio::net::TcpStream::connect(sam_addr).await {
Ok(mut stream) => {
use tokio::io::AsyncWriteExt;
if stream.write_all(b"HELLO VERSION MIN=3.1 MAX=3.3\n").await.is_ok() {
// If we can connect and send HELLO, tunnel is healthy
true
} else {
false
}
}
Err(_) => false,
}
}

async fn accept_once_raw(
session_id: &str,
sam_addr: &str,
Expand Down