From b252e198b0ddc5e8b5f18c41f0eb43e85baa29d6 Mon Sep 17 00:00:00 2001 From: rafabd1 Date: Sat, 14 Mar 2026 01:41:05 -0300 Subject: [PATCH] feat: add non-blocking handshakes and tunnel health monitoring (#13) - Spawn handshakes in separate tasks to avoid blocking accept loop - Add 60s timeout for incoming handshakes - Emit connection_error events on handshake timeout - Add tunnel_health_monitor task that checks SAM connectivity every 30s - Emit tunnel_healthy and tunnel_degraded events for status updates - Quick health check via SAM HELLO to verify tunnel responsiveness This implements issue #13 to improve resilience of accept_loop. Handshakes no longer block the accept loop, preventing multiple connection attempts from queuing indefinitely. Users receive feedback about tunnel health status, and the app can detect when the I2P tunnel becomes degraded. --- src-tauri/src/commands/session.rs | 86 +++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/src-tauri/src/commands/session.rs b/src-tauri/src/commands/session.rs index 9bb7c40..537537b 100644 --- a/src-tauri/src/commands/session.rs +++ b/src-tauri/src/commands/session.rs @@ -189,7 +189,16 @@ fn build_connect_link(dest: &str, ik_hex: &str, spk_hex: &str) -> String { } /// Accept loop: waits for incoming I2P stream connections for this session. +/// Implements non-blocking handshakes with timeout and tunnel health monitoring. async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) { + // Spawn health monitoring task + let app_health = app.clone(); + let session_id_health = session_id.clone(); + let sam_addr_health = sam_addr.clone(); + tauri::async_runtime::spawn(async move { + tunnel_health_monitor(app_health, session_id_health, sam_addr_health).await; + }); + loop { // Exit if the session was replaced or dropped (e.g. after panic_wipe) let should_continue = { @@ -203,11 +212,31 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) { match accept_once_raw(&session_id, &sam_addr).await { Ok((peer_dest, tunnel)) => { - if let Err(e) = handle_incoming(&app, peer_dest, tunnel).await { - #[cfg(debug_assertions)] - log::warn!("incoming session error: {}", e); - let _ = e; - } + // Spawn handshake in a separate task with timeout to avoid blocking + let app_clone = app.clone(); + tauri::async_runtime::spawn(async move { + // 60s timeout for handshake to prevent indefinite blocking + let result = tokio::time::timeout( + tokio::time::Duration::from_secs(60), + handle_incoming(&app_clone, peer_dest, tunnel) + ).await; + + match result { + Ok(Ok(())) => { + // Handshake successful + }, + Ok(Err(e)) => { + #[cfg(debug_assertions)] + log::warn!("incoming session handshake error: {}", e); + let _ = e; + }, + Err(_) => { + #[cfg(debug_assertions)] + log::warn!("incoming session handshake timeout (60s)"); + let _ = app_clone.emit("connection_error", "Incoming handshake timeout - peer unresponsive"); + } + } + }); } Err(e) => { #[cfg(debug_assertions)] @@ -218,6 +247,53 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) { } } +/// Monitor tunnel health and emit status events. +/// Periodically checks if the SAM session is still responsive. +async fn tunnel_health_monitor(app: AppHandle, session_id: String, sam_addr: String) { + loop { + // Check if session is still valid + let should_continue = { + let state = app.state::(); + let i2p = state.i2p.lock().await; + i2p.as_ref().map_or(false, |s| s.session_id == session_id) + }; + if !should_continue { + break; + } + + // Check tunnel health by attempting a quick SAM status check + let is_healthy = check_tunnel_health(&sam_addr).await; + + if is_healthy { + let _ = app.emit("tunnel_healthy", ()); + } else { + #[cfg(debug_assertions)] + log::warn!("Tunnel health check failed"); + let _ = app.emit("tunnel_degraded", ()); + } + + // Check every 30 seconds + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + } +} + +/// Quick health check for the I2P tunnel. +/// Attempts to connect to SAM and verify session is still valid. +async fn check_tunnel_health(sam_addr: &str) -> bool { + match tokio::net::TcpStream::connect(sam_addr).await { + Ok(mut stream) => { + use tokio::io::AsyncWriteExt; + if stream.write_all(b"HELLO VERSION MIN=3.1 MAX=3.3\n").await.is_ok() { + // If we can connect and send HELLO, tunnel is healthy + true + } else { + false + } + } + Err(_) => false, + } +} + async fn accept_once_raw( session_id: &str, sam_addr: &str,