diff --git a/src-tauri/src/commands/session.rs b/src-tauri/src/commands/session.rs index 9bb7c40..537537b 100644 --- a/src-tauri/src/commands/session.rs +++ b/src-tauri/src/commands/session.rs @@ -189,7 +189,16 @@ fn build_connect_link(dest: &str, ik_hex: &str, spk_hex: &str) -> String { } /// Accept loop: waits for incoming I2P stream connections for this session. +/// Implements non-blocking handshakes with timeout and tunnel health monitoring. async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) { + // Spawn health monitoring task + let app_health = app.clone(); + let session_id_health = session_id.clone(); + let sam_addr_health = sam_addr.clone(); + tauri::async_runtime::spawn(async move { + tunnel_health_monitor(app_health, session_id_health, sam_addr_health).await; + }); + loop { // Exit if the session was replaced or dropped (e.g. after panic_wipe) let should_continue = { @@ -203,11 +212,31 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) { match accept_once_raw(&session_id, &sam_addr).await { Ok((peer_dest, tunnel)) => { - if let Err(e) = handle_incoming(&app, peer_dest, tunnel).await { - #[cfg(debug_assertions)] - log::warn!("incoming session error: {}", e); - let _ = e; - } + // Spawn handshake in a separate task with timeout to avoid blocking + let app_clone = app.clone(); + tauri::async_runtime::spawn(async move { + // 60s timeout for handshake to prevent indefinite blocking + let result = tokio::time::timeout( + tokio::time::Duration::from_secs(60), + handle_incoming(&app_clone, peer_dest, tunnel) + ).await; + + match result { + Ok(Ok(())) => { + // Handshake successful + }, + Ok(Err(e)) => { + #[cfg(debug_assertions)] + log::warn!("incoming session handshake error: {}", e); + let _ = e; + }, + Err(_) => { + #[cfg(debug_assertions)] + log::warn!("incoming session handshake timeout (60s)"); + let _ = app_clone.emit("connection_error", "Incoming handshake timeout - peer unresponsive"); + } + } + }); } Err(e) => { #[cfg(debug_assertions)] @@ -218,6 +247,53 @@ async fn accept_loop(app: AppHandle, session_id: String, sam_addr: String) { } } +/// Monitor tunnel health and emit status events. +/// Periodically checks if the SAM session is still responsive. +async fn tunnel_health_monitor(app: AppHandle, session_id: String, sam_addr: String) { + loop { + // Check if session is still valid + let should_continue = { + let state = app.state::(); + let i2p = state.i2p.lock().await; + i2p.as_ref().map_or(false, |s| s.session_id == session_id) + }; + if !should_continue { + break; + } + + // Check tunnel health by attempting a quick SAM status check + let is_healthy = check_tunnel_health(&sam_addr).await; + + if is_healthy { + let _ = app.emit("tunnel_healthy", ()); + } else { + #[cfg(debug_assertions)] + log::warn!("Tunnel health check failed"); + let _ = app.emit("tunnel_degraded", ()); + } + + // Check every 30 seconds + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + } +} + +/// Quick health check for the I2P tunnel. +/// Attempts to connect to SAM and verify session is still valid. +async fn check_tunnel_health(sam_addr: &str) -> bool { + match tokio::net::TcpStream::connect(sam_addr).await { + Ok(mut stream) => { + use tokio::io::AsyncWriteExt; + if stream.write_all(b"HELLO VERSION MIN=3.1 MAX=3.3\n").await.is_ok() { + // If we can connect and send HELLO, tunnel is healthy + true + } else { + false + } + } + Err(_) => false, + } +} + async fn accept_once_raw( session_id: &str, sam_addr: &str,