diff --git a/src-tauri/src/commands/session.rs b/src-tauri/src/commands/session.rs index 9bb7c40..a0e9b0a 100644 --- a/src-tauri/src/commands/session.rs +++ b/src-tauri/src/commands/session.rs @@ -322,6 +322,7 @@ async fn handle_incoming( } /// Initiate a session by pasting peer's ech0:// link or raw JSON payload. +/// Implements retry with exponential backoff and progress feedback. #[tauri::command] pub async fn initiate_session( state: State<'_, AppState>, @@ -359,75 +360,147 @@ pub async fn initiate_session( let ik_b_pub = PublicKey::from(<[u8; 32]>::try_from(ik_b_bytes.as_slice()).unwrap()); let spk_b_pub = PublicKey::from(<[u8; 32]>::try_from(spk_b_bytes.as_slice()).unwrap()); - // Generate ephemeral key and compute X3DH - let ek_a = StaticSecret::random_from_rng(OsRng); - let ek_a_pub = PublicKey::from(&ek_a); - - let mut root_key = { - let id = state.identity.lock().await; - let keys = id.as_ref().ok_or("no identity generated")?; - x3dh_initiator(&keys.ik_secret, &ek_a, &ik_b_pub, &spk_b_pub) - }; - - let ratchet = DoubleRatchet::from_root_key(&root_key, true); - root_key.zeroize(); - - // Dial peer - let tunnel = { - let i2p = state.i2p.lock().await; - let session = i2p.as_ref().ok_or("i2p not connected")?; - session - .connect_to_peer(&peer.dest) - .await - .map_err(|e| e.to_string())? - }; + let peer_dest = peer.dest.clone(); + + // Retry loop with exponential backoff (max 5 attempts) + let max_attempts = 5u32; + let mut attempt = 0u32; + + while attempt < max_attempts { + attempt += 1; + + // Emit progress event + let _ = app.emit( + "connection_progress", + serde_json::json!({ + "attempt": attempt, + "max_attempts": max_attempts, + "status": "connecting" + }) + ); + + // Generate ephemeral key and compute X3DH for each attempt + let ek_a = StaticSecret::random_from_rng(OsRng); + let ek_a_pub = PublicKey::from(&ek_a); + + let mut root_key = { + let id = state.identity.lock().await; + let keys = id.as_ref().ok_or("no identity generated")?; + x3dh_initiator(&keys.ik_secret, &ek_a, &ik_b_pub, &spk_b_pub) + }; - let (mut reader, mut writer) = split(tunnel); + let ratchet = DoubleRatchet::from_root_key(&root_key, true); + root_key.zeroize(); - // Send HANDSHAKE_INIT - let ik_hex = { - let id = state.identity.lock().await; - id.as_ref().unwrap().ik_pub_hex() - }; + // Dial peer + let tunnel = { + let i2p = state.i2p.lock().await; + let session = i2p.as_ref().ok_or("i2p not connected")?; + match session.connect_to_peer(&peer.dest).await { + Ok(t) => t, + Err(e) => { + let error_msg = format!("Connection failed (attempt {}/{}): {}", attempt, max_attempts, e); + let _ = app.emit("connection_error", error_msg); + + if attempt < max_attempts { + // Exponential backoff: 2s, 4s, 8s, 16s + let backoff_secs = 2u64.pow(attempt.min(4)); + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; + } + continue; + } + } + }; - let init_msg = serde_json::to_vec(&HandshakeInit { - t: "hi".into(), - ik: ik_hex, - ek: hex::encode(ek_a_pub.as_bytes()), - }) - .map_err(|e| e.to_string())?; + let (mut reader, mut writer) = split(tunnel); - write_framed(&mut writer, &init_msg) - .await - .map_err(|e| e.to_string())?; + // Send HANDSHAKE_INIT + let ik_hex = { + let id = state.identity.lock().await; + id.as_ref().unwrap().ik_pub_hex() + }; - // Wait for ACK - let ack_frame = read_framed(&mut reader) - .await + let init_msg = serde_json::to_vec(&HandshakeInit { + t: "hi".into(), + ik: ik_hex, + ek: hex::encode(ek_a_pub.as_bytes()), + }) .map_err(|e| e.to_string())?; - let ack: HandshakeAck = serde_json::from_slice(&ack_frame).map_err(|e| e.to_string())?; - if ack.t != "ack" { - return Err(format!("unexpected ack type: {}", ack.t)); - } - - let peer_dest = peer.dest.clone(); - - *state.session.lock().await = Some(ActiveSession { - peer_dest: peer_dest.clone(), - peer_ik_bytes: <[u8; 32]>::try_from(ik_b_bytes.as_slice()).unwrap(), - ratchet, - stream_writer: writer, - started_at: now_secs(), - }); - let _ = app.emit("session_established", serde_json::json!({ "peer_dest": peer_dest })); + if let Err(e) = write_framed(&mut writer, &init_msg).await { + let error_msg = format!("Handshake send failed (attempt {}/{}): {}", attempt, max_attempts, e); + let _ = app.emit("connection_error", error_msg); + + if attempt < max_attempts { + let backoff_secs = 2u64.pow(attempt.min(4)); + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; + } + continue; + } - let app_clone = app.clone(); - tauri::async_runtime::spawn(async move { - receive_loop(app_clone, reader).await; - }); + // Wait for ACK with timeout + match tokio::time::timeout( + tokio::time::Duration::from_secs(60), + read_framed(&mut reader) + ).await { + Ok(Ok(ack_frame)) => { + let ack: HandshakeAck = serde_json::from_slice(&ack_frame).map_err(|e| e.to_string())?; + if ack.t != "ack" { + let error_msg = format!("Unexpected ack type (attempt {}/{}): {}", attempt, max_attempts, ack.t); + let _ = app.emit("connection_error", error_msg); + + if attempt < max_attempts { + let backoff_secs = 2u64.pow(attempt.min(4)); + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; + } + continue; + } + + // Handshake successful - establish session + *state.session.lock().await = Some(ActiveSession { + peer_dest: peer_dest.clone(), + peer_ik_bytes: <[u8; 32]>::try_from(ik_b_bytes.as_slice()).unwrap(), + ratchet, + stream_writer: writer, + started_at: now_secs(), + }); + + let _ = app.emit("session_established", serde_json::json!({ "peer_dest": peer_dest })); + + let app_clone = app.clone(); + tauri::async_runtime::spawn(async move { + receive_loop(app_clone, reader).await; + }); + + return Ok(()); + }, + Ok(Err(e)) => { + let error_msg = format!("Handshake receive failed (attempt {}/{}): {}", attempt, max_attempts, e); + let _ = app.emit("connection_error", error_msg); + + if attempt < max_attempts { + let backoff_secs = 2u64.pow(attempt.min(4)); + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; + } + continue; + }, + Err(_) => { + let error_msg = format!("Handshake timeout (attempt {}/{}) - peer did not respond within 60s", attempt, max_attempts); + let _ = app.emit("connection_error", error_msg); + + if attempt < max_attempts { + let backoff_secs = 2u64.pow(attempt.min(4)); + tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await; + } + continue; + } + } + } - Ok(()) + // All attempts failed + let final_error = format!("Connection failed after {} attempts. Peer may be offline or I2P tunnel degraded.", max_attempts); + let _ = app.emit("connection_error", final_error); + Err(final_error) } /// Background task: receive encrypted messages from the peer. @@ -629,4 +702,4 @@ pub async fn get_safety_numbers(state: State<'_, AppState>) -> Result