Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 134 additions & 61 deletions src-tauri/src/commands/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ async fn handle_incoming(
}

/// Initiate a session by pasting peer's ech0:// link or raw JSON payload.
/// Implements retry with exponential backoff and progress feedback.
#[tauri::command]
pub async fn initiate_session(
state: State<'_, AppState>,
Expand Down Expand Up @@ -359,75 +360,147 @@ pub async fn initiate_session(
let ik_b_pub = PublicKey::from(<[u8; 32]>::try_from(ik_b_bytes.as_slice()).unwrap());
let spk_b_pub = PublicKey::from(<[u8; 32]>::try_from(spk_b_bytes.as_slice()).unwrap());

// Generate ephemeral key and compute X3DH
let ek_a = StaticSecret::random_from_rng(OsRng);
let ek_a_pub = PublicKey::from(&ek_a);

let mut root_key = {
let id = state.identity.lock().await;
let keys = id.as_ref().ok_or("no identity generated")?;
x3dh_initiator(&keys.ik_secret, &ek_a, &ik_b_pub, &spk_b_pub)
};

let ratchet = DoubleRatchet::from_root_key(&root_key, true);
root_key.zeroize();

// Dial peer
let tunnel = {
let i2p = state.i2p.lock().await;
let session = i2p.as_ref().ok_or("i2p not connected")?;
session
.connect_to_peer(&peer.dest)
.await
.map_err(|e| e.to_string())?
};
let peer_dest = peer.dest.clone();

// Retry loop with exponential backoff (max 5 attempts)
let max_attempts = 5u32;
let mut attempt = 0u32;

while attempt < max_attempts {
attempt += 1;

// Emit progress event
let _ = app.emit(
"connection_progress",
serde_json::json!({
"attempt": attempt,
"max_attempts": max_attempts,
"status": "connecting"
})
);

// Generate ephemeral key and compute X3DH for each attempt
let ek_a = StaticSecret::random_from_rng(OsRng);
let ek_a_pub = PublicKey::from(&ek_a);

let mut root_key = {
let id = state.identity.lock().await;
let keys = id.as_ref().ok_or("no identity generated")?;
x3dh_initiator(&keys.ik_secret, &ek_a, &ik_b_pub, &spk_b_pub)
};

let (mut reader, mut writer) = split(tunnel);
let ratchet = DoubleRatchet::from_root_key(&root_key, true);
root_key.zeroize();

// Send HANDSHAKE_INIT
let ik_hex = {
let id = state.identity.lock().await;
id.as_ref().unwrap().ik_pub_hex()
};
// Dial peer
let tunnel = {
let i2p = state.i2p.lock().await;
let session = i2p.as_ref().ok_or("i2p not connected")?;
match session.connect_to_peer(&peer.dest).await {
Ok(t) => t,
Err(e) => {
let error_msg = format!("Connection failed (attempt {}/{}): {}", attempt, max_attempts, e);
let _ = app.emit("connection_error", error_msg);

if attempt < max_attempts {
// Exponential backoff: 2s, 4s, 8s, 16s
let backoff_secs = 2u64.pow(attempt.min(4));
tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
}
continue;
}
}
};

let init_msg = serde_json::to_vec(&HandshakeInit {
t: "hi".into(),
ik: ik_hex,
ek: hex::encode(ek_a_pub.as_bytes()),
})
.map_err(|e| e.to_string())?;
let (mut reader, mut writer) = split(tunnel);

write_framed(&mut writer, &init_msg)
.await
.map_err(|e| e.to_string())?;
// Send HANDSHAKE_INIT
let ik_hex = {
let id = state.identity.lock().await;
id.as_ref().unwrap().ik_pub_hex()
};

// Wait for ACK
let ack_frame = read_framed(&mut reader)
.await
let init_msg = serde_json::to_vec(&HandshakeInit {
t: "hi".into(),
ik: ik_hex,
ek: hex::encode(ek_a_pub.as_bytes()),
})
.map_err(|e| e.to_string())?;
let ack: HandshakeAck = serde_json::from_slice(&ack_frame).map_err(|e| e.to_string())?;
if ack.t != "ack" {
return Err(format!("unexpected ack type: {}", ack.t));
}

let peer_dest = peer.dest.clone();

*state.session.lock().await = Some(ActiveSession {
peer_dest: peer_dest.clone(),
peer_ik_bytes: <[u8; 32]>::try_from(ik_b_bytes.as_slice()).unwrap(),
ratchet,
stream_writer: writer,
started_at: now_secs(),
});

let _ = app.emit("session_established", serde_json::json!({ "peer_dest": peer_dest }));
if let Err(e) = write_framed(&mut writer, &init_msg).await {
let error_msg = format!("Handshake send failed (attempt {}/{}): {}", attempt, max_attempts, e);
let _ = app.emit("connection_error", error_msg);

if attempt < max_attempts {
let backoff_secs = 2u64.pow(attempt.min(4));
tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
}
continue;
}

let app_clone = app.clone();
tauri::async_runtime::spawn(async move {
receive_loop(app_clone, reader).await;
});
// Wait for ACK with timeout
match tokio::time::timeout(
tokio::time::Duration::from_secs(60),
read_framed(&mut reader)
).await {
Ok(Ok(ack_frame)) => {
let ack: HandshakeAck = serde_json::from_slice(&ack_frame).map_err(|e| e.to_string())?;
if ack.t != "ack" {
let error_msg = format!("Unexpected ack type (attempt {}/{}): {}", attempt, max_attempts, ack.t);
let _ = app.emit("connection_error", error_msg);

if attempt < max_attempts {
let backoff_secs = 2u64.pow(attempt.min(4));
tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
}
continue;
}

// Handshake successful - establish session
*state.session.lock().await = Some(ActiveSession {
peer_dest: peer_dest.clone(),
peer_ik_bytes: <[u8; 32]>::try_from(ik_b_bytes.as_slice()).unwrap(),
ratchet,
stream_writer: writer,
started_at: now_secs(),
});

let _ = app.emit("session_established", serde_json::json!({ "peer_dest": peer_dest }));

let app_clone = app.clone();
tauri::async_runtime::spawn(async move {
receive_loop(app_clone, reader).await;
});

return Ok(());
},
Ok(Err(e)) => {
let error_msg = format!("Handshake receive failed (attempt {}/{}): {}", attempt, max_attempts, e);
let _ = app.emit("connection_error", error_msg);

if attempt < max_attempts {
let backoff_secs = 2u64.pow(attempt.min(4));
tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
}
continue;
},
Err(_) => {
let error_msg = format!("Handshake timeout (attempt {}/{}) - peer did not respond within 60s", attempt, max_attempts);
let _ = app.emit("connection_error", error_msg);

if attempt < max_attempts {
let backoff_secs = 2u64.pow(attempt.min(4));
tokio::time::sleep(tokio::time::Duration::from_secs(backoff_secs)).await;
}
continue;
}
}
}

Ok(())
// All attempts failed
let final_error = format!("Connection failed after {} attempts. Peer may be offline or I2P tunnel degraded.", max_attempts);
let _ = app.emit("connection_error", final_error);
Err(final_error)
}

/// Background task: receive encrypted messages from the peer.
Expand Down Expand Up @@ -629,4 +702,4 @@ pub async fn get_safety_numbers(state: State<'_, AppState>) -> Result<String, St
keys.ik_public.as_bytes(),
&session.peer_ik_bytes,
))
}
}