From d2586652589062bbb11c31c93f832cba081b9fea Mon Sep 17 00:00:00 2001 From: Lion Kortlepel Date: Sun, 4 May 2025 13:44:55 +0200 Subject: [PATCH 1/4] refactor: move client-related handling into a new Client struct --- connserver/src/connection/client.rs | 238 ++++++++++++++++++++++++++++ connserver/src/connection/mod.rs | 1 + connserver/src/main.rs | 219 ++++--------------------- 3 files changed, 266 insertions(+), 192 deletions(-) create mode 100644 connserver/src/connection/client.rs diff --git a/connserver/src/connection/client.rs b/connserver/src/connection/client.rs new file mode 100644 index 0000000..733faf0 --- /dev/null +++ b/connserver/src/connection/client.rs @@ -0,0 +1,238 @@ +use log::{error, info}; +use tokio::{ + net::tcp::{OwnedReadHalf, OwnedWriteHalf}, + sync::mpsc, +}; +use tokio_util::bytes::BytesMut; + +use crate::{Either, Msg}; + +use super::stale::{StaleClient, StaleConnectionManager}; + +pub struct Client { + pub read: net::FramedReader, + pub write: OwnedWriteHalf, + send_receiver: mpsc::Receiver, + pub recv_sender: mpsc::Sender, + client_id: u64, +} + +impl Client { + pub fn new( + mut stream: tokio::net::TcpStream, + send_receiver: mpsc::Receiver, + recv_sender: mpsc::Sender, + client_id: u64, + ) -> Self { + // NOTE(lion): We deliberately ignore this, because dealing with this here sucks + // and we don't care about the result, because it cannot really fail + _ = net::configure_performance_tcp_socket(&mut stream); + let (read, write) = stream.into_split(); + let read = net::new_framed_reader(read); + Self { + read, + write, + send_receiver, + recv_sender, + client_id, + } + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + loop { + if !self.run_once().await? { + break; + } + } + Ok(()) + } + + async fn run_once(&mut self) -> anyhow::Result { + tokio::select! { + res = net::recv_size_prefixed(&mut self.read) => { + if let Some(value) = self.handle_packet(res).await { + return value; + } + } + Some(msg) = self.send_receiver.recv() => { + if let Some(value) = self.handle_msg(msg).await { + return value; + } + } + } + Ok(true) + } + + async fn handle_msg(&mut self, msg: Msg) -> Option> { + match msg { + Msg::Data(packet) => match packet { + net::TaggedPacket::Data { + client_id: id, + data, + } => { + if id != self.client_id { + error!( + "Client: Received message for client {} but expected {}", + id, self.client_id + ); + } + if let Err(e) = net::send_size_prefixed(&mut self.write, &data).await { + error!( + "Client: Error sending message for client {}: {}", + self.client_id, e + ); + return Some(Err(e.into())); + } + } + _ => { + error!("Client: Unexpected packet type from master: {:?}", packet); + } + }, + Msg::Stop => { + info!("Stopping client duplex for client {}", self.client_id); + return Some(Ok(false)); + } + } + None + } + + async fn handle_packet( + &mut self, + res: anyhow::Result, + ) -> Option> { + match res { + Ok(buffer) => { + if let Err(e) = self + .recv_sender + .send(Msg::Data(net::TaggedPacket::Data { + client_id: self.client_id, + data: buffer.to_vec(), + })) + .await + { + error!( + "Client: Error sending message for client {}: {}", + self.client_id, e + ); + return Some(Err(e.into())); + } + } + Err(e) => { + error!( + "Client: Error receiving packet for client {}: {}", + self.client_id, e + ); + return Some(Err(e.into())); + } + } + None + } + + pub async fn try_reconnect( + &mut self, + keys: &enc::easy::Keys, + mut client_id: u64, + stale_conn_manager: &StaleConnectionManager, + ) -> Result< + Either<(enc::easy::Encryption, u64), StaleClient>, + Box, + > { + let buffer = net::recv_size_prefixed(&mut self.read).await?; + match net::ClientServerPacket::from_slice(&buffer) { + Ok(net::ClientServerPacket::ProtocolVersion(version)) => { + if version != net::PROTOCOL_VERSION { + return Err(format!("Unsupported protocol version: {}", version).into()); + } + } + Ok(_) => { + return Err("Expected protocol version packet".into()); + } + Err(e) => { + return Err(format!("Invalid protocol version packet: {}", e).into()); + } + } + + // 0. send server key + let packet = net::ClientServerPacket::PubKey(keys.pubkey_to_bytes()); + net::send_size_prefixed(&mut self.write, &packet.into_vec()?).await?; + + // 1. receive either pubkey or client id, depending if its a new connection or reconnection attempt + let buffer = net::recv_size_prefixed(&mut self.read).await?; + match net::ClientServerPacket::from_slice(&buffer) { + Ok(net::ClientServerPacket::PubKey(key)) => { + // new connection + info!("New client {} is connecting", client_id); + let their_pubkey = enc::easy::pubkey_from_bytes(&key)?; + // 2. send client id + let packet = net::ClientServerPacket::ClientId(client_id); + net::send_size_prefixed(&mut self.write, &packet.into_vec()?).await?; + Ok(Either::Left(( + keys.create_encryption(&their_pubkey), + client_id, + ))) + } + Ok(net::ClientServerPacket::ClientId(id)) => { + info!("A client is trying to reconnect as client {}", id); + client_id = id; + // reconnection attempt + let stale_client = match stale_conn_manager.remove_stale_client(client_id).await { + Some(stale_client) => stale_client, + None => { + return Err(format!("Client ID {} not found in stale clients", id).into()) + } + }; + + if let Err(e) = self.do_reconnect(&stale_client.encryption).await { + error!("Error during reconnection: {}", e); + stale_conn_manager + .add_stale_client( + client_id, + stale_client.encryption, + stale_client.disconnected, + ) + .await; + return Err(format!("Failed to reconnect client {}: {}", client_id, e).into()); + } else { + Ok(Either::Right(stale_client)) + } + } + _ => Err("Expected client ID or public key packet".into()), + } + } + + async fn do_reconnect( + &mut self, + stale_client_enc: &enc::easy::Encryption, + ) -> anyhow::Result<()> { + let challenge_bytes = enc::easy::random_bytes(32); + let encrypted_challenge = stale_client_enc.encrypt(challenge_bytes.clone()); + let packet = net::ClientServerPacket::Challenge(encrypted_challenge); + net::send_size_prefixed(&mut self.write, &packet.into_vec()?).await?; + let buffer = net::recv_size_prefixed(&mut self.read).await?; + let response = match net::ClientServerPacket::from_slice(&buffer) { + Ok(net::ClientServerPacket::ChallengeResponse(response)) => response, + Ok(_) => return Err(anyhow::format_err!("Expected challenge response packet")), + Err(e) => { + return Err(anyhow::format_err!( + "Invalid challenge response packet: {}", + e + )) + } + }; + let decrypted_response = stale_client_enc + .decrypt(response) + .map_err(|e| anyhow::format_err!("Failed to decrypt challenge response: {}", e))?; + if decrypted_response != challenge_bytes { + return Err(anyhow::format_err!("Challenge response does not match")); + } + let buffer = net::recv_size_prefixed(&mut self.read).await?; + match net::ClientServerPacket::from_slice(&buffer) { + Ok(net::ClientServerPacket::Ping) => {} + Ok(_) => return Err(anyhow::format_err!("Expected ping packet")), + Err(e) => return Err(anyhow::format_err!("Invalid ping packet: {}", e)), + } + let packet = net::ClientServerPacket::Ping; + net::send_size_prefixed(&mut self.write, &packet.into_vec()?).await?; + Ok(()) + } +} diff --git a/connserver/src/connection/mod.rs b/connserver/src/connection/mod.rs index 141c54f..280f9ff 100644 --- a/connserver/src/connection/mod.rs +++ b/connserver/src/connection/mod.rs @@ -1 +1,2 @@ pub mod stale; +pub mod client; diff --git a/connserver/src/main.rs b/connserver/src/main.rs index 5001f4b..634e9db 100644 --- a/connserver/src/main.rs +++ b/connserver/src/main.rs @@ -6,7 +6,10 @@ use std::{ }, }; -use connection::stale::{StaleClient, StaleConnectionManager}; +use connection::{ + client::Client, + stale::{StaleClient, StaleConnectionManager}, +}; use ids::IdGenerator; use log::{error, info}; use net::{ClientServerPacket, TaggedPacket}; @@ -152,7 +155,6 @@ impl ConnServer { Ok(res) => res?, Err(_) => return Ok(()), }; - net::configure_performance_tcp_socket(&mut client)?; let client_id = id_gen.lock().await.next_id(); info!( @@ -194,60 +196,6 @@ impl ConnServer { } } -async fn handle_socket_duplex_client( - read: &mut net::FramedReader, - write: &mut OwnedWriteHalf, - mut send_receiver: mpsc::Receiver, - recv_sender: mpsc::Sender, - client_id: u64, -) -> Result<(), Box> { - loop { - tokio::select! { - res = net::recv_size_prefixed(read) => { - match res { - Ok(buffer) => { - if let Err(e) = recv_sender.send(Msg::Data(TaggedPacket::Data { client_id, data: buffer.to_vec() })).await { - error!("Client: Error sending message for client {}: {}", client_id, e); - return Err(e.into()); - } - } - Err(e) => { - error!("Client: Error receiving packet for client {}: {}", client_id, e); - return Err(e.into()); - } - } - } - Some(msg) = send_receiver.recv() => { - match msg { - Msg::Data(packet) => { - match packet { - TaggedPacket::Data { client_id: id, data } => { - if id != client_id { - error!("Client: Received message for client {} but expected {}", id, client_id); - continue; - } - if let Err(e) = net::send_size_prefixed(write, &data).await { - error!("Client: Error sending message for client {}: {}", client_id, e); - return Err(e.into()); - } - } - _ => { - error!("Client: Unexpected packet type from master: {:?}", packet); - continue; - } - } - } - Msg::Stop => { - info!("Stopping client duplex for client {}", client_id); - break; - } - } - } - } - } - Ok(()) -} - async fn handle_socket_duplex_master( socket: TcpStream, mut send_receiver: mpsc::Receiver, @@ -313,113 +261,6 @@ pub enum Either { Right(U), } -async fn handle_connect( - read: &mut net::FramedReader, - write: &mut OwnedWriteHalf, - keys: &enc::easy::Keys, - mut client_id: u64, - stale_conn_manager: &StaleConnectionManager, -) -> Result< - Either<(enc::easy::Encryption, u64), StaleClient>, - Box, -> { - let buffer = net::recv_size_prefixed(read).await?; - match ClientServerPacket::from_slice(&buffer) { - Ok(ClientServerPacket::ProtocolVersion(version)) => { - if version != net::PROTOCOL_VERSION { - return Err(format!("Unsupported protocol version: {}", version).into()); - } - } - Ok(_) => { - return Err("Expected protocol version packet".into()); - } - Err(e) => { - return Err(format!("Invalid protocol version packet: {}", e).into()); - } - } - - // 0. send server key - let packet = ClientServerPacket::PubKey(keys.pubkey_to_bytes()); - net::send_size_prefixed(write, &packet.into_vec()?).await?; - - // 1. receive either pubkey or client id, depending if its a new connection or reconnection attempt - let buffer = net::recv_size_prefixed(read).await?; - match ClientServerPacket::from_slice(&buffer) { - Ok(ClientServerPacket::PubKey(key)) => { - // new connection - info!("New client {} is connecting", client_id); - let their_pubkey = enc::easy::pubkey_from_bytes(&key)?; - // 2. send client id - let packet = ClientServerPacket::ClientId(client_id); - net::send_size_prefixed(write, &packet.into_vec()?).await?; - Ok(Either::Left(( - keys.create_encryption(&their_pubkey), - client_id, - ))) - } - Ok(ClientServerPacket::ClientId(id)) => { - info!("A client is trying to reconnect as client {}", id); - client_id = id; - // reconnection attempt - let stale_client = match stale_conn_manager.remove_stale_client(client_id).await { - Some(stale_client) => stale_client, - None => return Err(format!("Client ID {} not found in stale clients", id).into()), - }; - - if let Err(e) = do_reconnect(read, write, &stale_client.encryption).await { - error!("Error during reconnection: {}", e); - stale_conn_manager - .add_stale_client( - client_id, - stale_client.encryption, - stale_client.disconnected, - ) - .await; - return Err(format!("Failed to reconnect client {}: {}", client_id, e).into()); - } else { - Ok(Either::Right(stale_client)) - } - } - _ => Err("Expected client ID or public key packet".into()), - } -} - -async fn do_reconnect( - read: &mut FramedRead, - write: &mut OwnedWriteHalf, - stale_client_enc: &enc::easy::Encryption, -) -> anyhow::Result<()> { - let challenge_bytes = enc::easy::random_bytes(32); - let encrypted_challenge = stale_client_enc.encrypt(challenge_bytes.clone()); - let packet = ClientServerPacket::Challenge(encrypted_challenge); - net::send_size_prefixed(write, &packet.into_vec()?).await?; - let buffer = net::recv_size_prefixed(read).await?; - let response = match ClientServerPacket::from_slice(&buffer) { - Ok(ClientServerPacket::ChallengeResponse(response)) => response, - Ok(_) => return Err(anyhow::format_err!("Expected challenge response packet")), - Err(e) => { - return Err(anyhow::format_err!( - "Invalid challenge response packet: {}", - e - )) - } - }; - let decrypted_response = stale_client_enc - .decrypt(response) - .map_err(|e| anyhow::format_err!("Failed to decrypt challenge response: {}", e))?; - if decrypted_response != challenge_bytes { - return Err(anyhow::format_err!("Challenge response does not match")); - } - let buffer = net::recv_size_prefixed(read).await?; - match ClientServerPacket::from_slice(&buffer) { - Ok(ClientServerPacket::Ping) => {} - Ok(_) => return Err(anyhow::format_err!("Expected ping packet")), - Err(e) => return Err(anyhow::format_err!("Invalid ping packet: {}", e)), - } - let packet = ClientServerPacket::Ping; - net::send_size_prefixed(write, &packet.into_vec()?).await?; - Ok(()) -} async fn master_recv_main_loop( mut master_recv_receiver: mpsc::Receiver, @@ -518,28 +359,29 @@ async fn handle_client( id_gen: Arc>, send_sender: mpsc::Sender, ) { - let (read, mut write) = client.into_split(); - let mut read = net::new_framed_reader(read); + let mut client = Client::new(client, client_receiver, master_send_sender, client_id); let client_id_before = client_id; - let encryption = - match handle_connect(&mut read, &mut write, &keys, client_id, &stale_conn_manager).await { - Ok(Either::Left((enc, new_id))) => { - client_id = new_id; - info!("Client {} connected", client_id); - enc - } - Ok(Either::Right(stale_client)) => { - info!("Client {} reconnected", client_id); - for packet in stale_client.missed_packets { - send_sender.send(Msg::Data(packet)).await.unwrap(); - } - stale_client.encryption - } - Err(e) => { - error!("Error handling client {}: {}", client_id, e); - return; + let encryption = match client + .try_reconnect(&keys, client_id, &stale_conn_manager) + .await + { + Ok(Either::Left((enc, new_id))) => { + client_id = new_id; + info!("Client {} connected", client_id); + enc + } + Ok(Either::Right(stale_client)) => { + info!("Client {} reconnected", client_id); + for packet in stale_client.missed_packets { + send_sender.send(Msg::Data(packet)).await.unwrap(); } - }; + stale_client.encryption + } + Err(e) => { + error!("Error handling client {}: {}", client_id, e); + return; + } + }; if client_id != client_id_before { info!( "Client ID changed from {} to {} due to reconnect", @@ -559,15 +401,8 @@ async fn handle_client( clients.insert(client_id, client_sender); id_gen.lock().await.release_id(client_id_before); } - if let Err(e) = handle_socket_duplex_client( - &mut read, - &mut write, - client_receiver, - master_send_sender, - client_id, - ) - .await - { + + if let Err(e) = client.run().await { error!( "Error handling client duplex for client {}: {}", client_id, e From 1e4af3d20d13403598bd3d85e5c72c96f8d0fa8a Mon Sep 17 00:00:00 2001 From: Lion Kortlepel Date: Fri, 16 May 2025 23:29:28 +0200 Subject: [PATCH 2/4] refactor: move master and connserver into their own files --- Cargo.lock | 12 + connserver/Cargo.toml | 1 + connserver/src/connection/client.rs | 2 +- connserver/src/connserver.rs | 407 +++++++++++++++++++++++++++ connserver/src/main.rs | 422 +--------------------------- connserver/src/master.rs | 41 +++ 6 files changed, 473 insertions(+), 412 deletions(-) create mode 100644 connserver/src/connserver.rs create mode 100644 connserver/src/master.rs diff --git a/Cargo.lock b/Cargo.lock index 6015cfc..4f79ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,6 +92,17 @@ version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -238,6 +249,7 @@ name = "connserver" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "enc", "env_logger", "futures", diff --git a/connserver/Cargo.toml b/connserver/Cargo.toml index 5ef9517..14fbb16 100644 --- a/connserver/Cargo.toml +++ b/connserver/Cargo.toml @@ -14,6 +14,7 @@ enc = { path = "../enc" } anyhow = "1.0.98" serde = { version = "1.0.219", features = ["derive"] } toml = "0.8.20" +async-trait = "0.1.88" [profile.release-with-debug] inherits = "release" diff --git a/connserver/src/connection/client.rs b/connserver/src/connection/client.rs index 733faf0..e5ad6aa 100644 --- a/connserver/src/connection/client.rs +++ b/connserver/src/connection/client.rs @@ -5,7 +5,7 @@ use tokio::{ }; use tokio_util::bytes::BytesMut; -use crate::{Either, Msg}; +use crate::connserver::{Either, Msg}; use super::stale::{StaleClient, StaleConnectionManager}; diff --git a/connserver/src/connserver.rs b/connserver/src/connserver.rs new file mode 100644 index 0000000..754adbe --- /dev/null +++ b/connserver/src/connserver.rs @@ -0,0 +1,407 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use connection::{client::Client, stale::StaleConnectionManager}; +use ids::IdGenerator; +use log::{error, info}; +use net::TaggedPacket; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::{mpsc, Mutex}, +}; + +use crate::{config, connection, ids, master::MasterDuplex}; + +#[derive(Clone)] +pub enum Msg { + Stop, + Data(TaggedPacket), +} + +pub struct ConnServer { + config: config::Config, + _marker: std::marker::PhantomData, +} + +impl ConnServer +where + T: MasterDuplex + Send + Sync + 'static, +{ + pub async fn new(config: config::Config) -> Self { + ConnServer { + config, + _marker: std::marker::PhantomData, + } + } + + pub async fn run(&mut self, mut master_duplex: T) -> anyhow::Result<()> { + let acceptor = TcpListener::bind(&self.config.listener.address).await?; + info!("Listening on {}", acceptor.local_addr()?); + + let keys = Arc::new(enc::easy::Keys::new()); + let dead = Arc::new(AtomicBool::new(false)); + + let id_gen = Arc::new(Mutex::new(IdGenerator::new())); + + let (master_send_sender, master_send_receiver) = + mpsc::channel::(self.config.master.channel_capacity); + let (master_recv_sender, master_recv_receiver) = + mpsc::channel::(self.config.master.channel_capacity); + + let (removed_sender, removed_receiver) = mpsc::channel(2); + + let clients = Arc::new(Mutex::new(HashMap::>::new())); + let stale_conn_manager = StaleConnectionManager::new(self.config.clone()); + + tokio::spawn({ + let dead = dead.clone(); + async move { + if let Err(e) = handle_socket_duplex_master( + &mut master_duplex, + master_send_receiver, + master_recv_sender, + removed_receiver, + ) + .await + { + error!("Error handling master duplex: {}", e); + error!("FATAL: Master server is down, exiting to fail hard"); + dead.store(true, Ordering::Relaxed); + } + } + }); + + tokio::spawn(stale_conn_manager.clone().run_cleanup(removed_sender)); + + tokio::spawn({ + let clients = clients.clone(); + let master_send_sender = master_send_sender.clone(); + let stale_conn_manager = stale_conn_manager.clone(); + let config = self.config.clone(); + async move { + if let Err(e) = master_recv_main_loop( + master_recv_receiver, + master_send_sender, + clients, + stale_conn_manager, + config, + ) + .await + { + error!("Error handling master recv: {}", e); + } + } + }); + + while !dead.load(Ordering::Relaxed) { + if let Err(err) = self + .accept_client( + &acceptor, + master_send_sender.clone(), + keys.clone(), + clients.clone(), + stale_conn_manager.clone(), + &id_gen, + ) + .await + { + error!("Failed accepting client: {}", err); + } + } + info!("Stopping server"); + let clients = clients.lock().await; + let mut tmp = clients.values(); + while let Some(client_sender) = tmp.next() { + if let Err(e) = client_sender.send(Msg::Stop).await { + error!("Error sending stop message to client: {}", e); + } + } + if let Err(e) = master_send_sender.send(Msg::Stop).await { + error!("Error sending stop message to master: {}", e); + } + // NOTE(lion): This is a bad situation, and most likely the master server is down. + // We should never reach this point, but if we do, we should exit with an error, so that any sane + // monitoring system can pick it up and restart the server, and hopefully the master-server too. + info!("Assuming this server should never go down, shutting down is considered an error, but something forced this server to shut down (probably an error)"); + Err(anyhow::format_err!("Unexpected server shutdown")) + } + + async fn accept_client( + &self, + acceptor: &TcpListener, + master_send_sender: mpsc::Sender, + keys: Arc, + clients: Arc>>>, + stale_conn_manager: StaleConnectionManager, + id_gen: &Arc>, + ) -> Result<(), Box> { + let (client, _) = match tokio::time::timeout( + std::time::Duration::from_secs(1), + acceptor.accept(), + ) + .await + { + Ok(res) => res?, + Err(_) => return Ok(()), + }; + + let client_id = id_gen.lock().await.next_id(); + info!( + "Accepted client connection id={}, endpoint={:?}", + client_id, + client.peer_addr() + ); + // these are messages from master to client + let (client_sender, client_receiver) = + mpsc::channel::(self.config.clients.channel_capacity); + { + clients + .lock() + .await + .insert(client_id, client_sender.clone()); + } + tokio::spawn({ + let master_send_sender = master_send_sender.clone(); + let keys = keys.clone(); + let clients = clients.clone(); + let id_gen = id_gen.clone(); + let client_sender = client_sender.clone(); + async move { + handle_client( + client, + master_send_sender, + client_receiver, + keys, + clients, + stale_conn_manager, + client_id, + id_gen, + client_sender, + ) + .await + } + }); + Ok(()) + } +} + +async fn handle_socket_duplex_master( + master: &mut impl MasterDuplex, + mut send_receiver: mpsc::Receiver, + recv_sender: mpsc::Sender, + mut removed_receiver: mpsc::Receiver>, +) -> Result<(), Box> { + loop { + tokio::select! { + removed_client_ids = removed_receiver.recv() => { + if let Some(client_ids) = removed_client_ids { + for client_id in client_ids { + let packet = TaggedPacket::Failure { client_id, error: "Client disconnected".to_string() }; + if let Err(e) = master.send_packet(packet).await { + error!("Master: Error sending message for client {}: {}", client_id, e); + return Err(e.into()); + } + } + } else { + error!("Master: Error receiving removed client IDs"); + return Err(anyhow::format_err!("Error receiving removed client IDs").into()); + } + } + packet = master.recv_packet() => { + match packet { + Ok(packet) => { + if let TaggedPacket::Data { client_id, .. } = packet { + if let Err(e) = recv_sender.send(Msg::Data(packet.clone())).await { + panic!("Master: Error sending message for client {}: {}", client_id, e); + } + } else { + error!("Master: Unexpected packet type from client {}: {:?}", packet.client_id(), packet); + continue; + } + } + Err(e) => { + error!("Master: Error receiving packet for client: {}", e); + return Err(e.into()); + } + } + } + Some(msg) = send_receiver.recv() => { + match msg { + Msg::Data(packet) => { + if let Err(e) = master.send_packet(packet.clone()).await { + panic!("Master: Error sending message for client {}: {}", packet.client_id(), e); + } + } + Msg::Stop => { + info!("Stopping master duplex"); + break; + } + } + } + } + } + Ok(()) +} + +pub enum Either { + Left(T), + Right(U), +} + +async fn master_recv_main_loop( + mut master_recv_receiver: mpsc::Receiver, + master_send_sender: mpsc::Sender, + clients: Arc>>>, + stale_conn_manager: StaleConnectionManager, + config: config::Config, +) -> Result<(), Box> { + // take messages from the master_recv_receiver and send them to each client + loop { + if let Some(msg) = master_recv_receiver.recv().await { + match msg { + Msg::Data(packet) => match packet { + TaggedPacket::Data { client_id, .. } => { + let client = { + let clients = clients.lock().await; + clients.get(&client_id).cloned() + }; + match client { + Some(client_sender) => { + if let Err(e) = client_sender.send(Msg::Data(packet.clone())).await + { + error!("Error sending message to client {}: {}", client_id, e); + } + } + None => { + // Client not found, check if it's a stale client + // in that case we want to enqueue the packet if that's configured. + // if not, we fail the client right here. + if config.clients.missed_packets_buffer_size > 0 + && stale_conn_manager.has_client(client_id).await + { + if let Err(e) = stale_conn_manager + .add_missed_packet( + client_id, + packet, + config.clients.missed_packets_buffer_size, + ) + .await + { + error!( + "Error adding missed packet for client {}: {}, this fails the client", + client_id, e + ); + stale_conn_manager.remove_stale_client(client_id).await; + let packet = TaggedPacket::Failure { + client_id, + error: "Client disconnected".to_string(), + }; + if let Err(e) = + master_send_sender.send(Msg::Data(packet)).await + { + error!( + "Error sending failure message to master: {}", + e + ); + } + } + } else { + error!("Client {} not found, missed packet buffering not enabled, failing client", client_id); + stale_conn_manager.remove_stale_client(client_id).await; + let packet = TaggedPacket::Failure { + client_id, + error: "Client not found".to_string(), + }; + if let Err(e) = master_send_sender.send(Msg::Data(packet)).await + { + error!("Error sending failure message to master: {}", e); + } + } + } + } + } + _ => { + error!("Unexpected packet type from master: {:?}", packet); + continue; + } + }, + Msg::Stop => { + info!("Stopping master recv"); + break Ok(()); + } + } + } + } +} + +async fn handle_client( + client: TcpStream, + master_send_sender: mpsc::Sender, + client_receiver: mpsc::Receiver, + keys: Arc, + clients: Arc>>>, + stale_conn_manager: StaleConnectionManager, + mut client_id: u64, + id_gen: Arc>, + send_sender: mpsc::Sender, +) { + let mut client = Client::new(client, client_receiver, master_send_sender, client_id); + let client_id_before = client_id; + let encryption = match client + .try_reconnect(&keys, client_id, &stale_conn_manager) + .await + { + Ok(Either::Left((enc, new_id))) => { + client_id = new_id; + info!("Client {} connected", client_id); + enc + } + Ok(Either::Right(stale_client)) => { + info!("Client {} reconnected", client_id); + for packet in stale_client.missed_packets { + send_sender.send(Msg::Data(packet)).await.unwrap(); + } + stale_client.encryption + } + Err(e) => { + error!("Error handling client {}: {}", client_id, e); + return; + } + }; + if client_id != client_id_before { + info!( + "Client ID changed from {} to {} due to reconnect", + client_id_before, client_id + ); + let mut clients = clients.lock().await; + let client_sender = match clients.remove(&client_id_before) { + Some(sender) => sender, + None => { + error!( + "Client ID {} not found in clients, but must exist", + client_id_before + ); + return; + } + }; + clients.insert(client_id, client_sender); + id_gen.lock().await.release_id(client_id_before); + } + + if let Err(e) = client.run().await { + error!( + "Error handling client duplex for client {}: {}", + client_id, e + ); + stale_conn_manager + .add_stale_client(client_id, encryption, std::time::Instant::now()) + .await; + let mut clients = clients.lock().await; + clients.remove(&client_id); + } +} diff --git a/connserver/src/main.rs b/connserver/src/main.rs index 634e9db..5783423 100644 --- a/connserver/src/main.rs +++ b/connserver/src/main.rs @@ -1,419 +1,12 @@ -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; - -use connection::{ - client::Client, - stale::{StaleClient, StaleConnectionManager}, -}; -use ids::IdGenerator; +use connserver::ConnServer; use log::{error, info}; -use net::{ClientServerPacket, TaggedPacket}; -use tokio::{ - net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpListener, TcpStream, - }, - sync::{mpsc, Mutex}, -}; -use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; mod args; mod config; mod connection; +mod connserver; mod ids; - -#[derive(Clone)] -enum Msg { - Stop, - Data(TaggedPacket), -} - -struct ConnServer { - config: config::Config, -} - -impl ConnServer { - async fn new(config: config::Config) -> Self { - ConnServer { config } - } - - async fn run(&mut self) -> anyhow::Result<()> { - let mut master = TcpStream::connect(&self.config.master.address).await?; - net::configure_performance_tcp_socket(&mut master)?; - - let acceptor = TcpListener::bind(&self.config.listener.address).await?; - info!("Listening on {}", acceptor.local_addr()?); - - let keys = Arc::new(enc::easy::Keys::new()); - let dead = Arc::new(AtomicBool::new(false)); - - let id_gen = Arc::new(Mutex::new(IdGenerator::new())); - - let (master_send_sender, master_send_receiver) = - mpsc::channel::(self.config.master.channel_capacity); - let (master_recv_sender, master_recv_receiver) = - mpsc::channel::(self.config.master.channel_capacity); - - let (removed_sender, removed_receiver) = mpsc::channel(2); - - let clients = Arc::new(Mutex::new(HashMap::>::new())); - let stale_conn_manager = StaleConnectionManager::new(self.config.clone()); - - tokio::spawn({ - let dead = dead.clone(); - async move { - if let Err(e) = handle_socket_duplex_master( - master, - master_send_receiver, - master_recv_sender, - removed_receiver, - ) - .await - { - error!("Error handling master duplex: {}", e); - error!("FATAL: Master server is down, exiting to fail hard"); - dead.store(true, Ordering::Relaxed); - } - } - }); - - tokio::spawn(stale_conn_manager.clone().run_cleanup(removed_sender)); - - tokio::spawn({ - let clients = clients.clone(); - let master_send_sender = master_send_sender.clone(); - let stale_conn_manager = stale_conn_manager.clone(); - let config = self.config.clone(); - async move { - if let Err(e) = master_recv_main_loop( - master_recv_receiver, - master_send_sender, - clients, - stale_conn_manager, - config, - ) - .await - { - error!("Error handling master recv: {}", e); - } - } - }); - - while !dead.load(Ordering::Relaxed) { - if let Err(err) = self - .accept_client( - &acceptor, - master_send_sender.clone(), - keys.clone(), - clients.clone(), - stale_conn_manager.clone(), - &id_gen, - ) - .await - { - error!("Failed accepting client: {}", err); - } - } - info!("Stopping server"); - let clients = clients.lock().await; - let mut tmp = clients.values(); - while let Some(client_sender) = tmp.next() { - if let Err(e) = client_sender.send(Msg::Stop).await { - error!("Error sending stop message to client: {}", e); - } - } - if let Err(e) = master_send_sender.send(Msg::Stop).await { - error!("Error sending stop message to master: {}", e); - } - // NOTE(lion): This is a bad situation, and most likely the master server is down. - // We should never reach this point, but if we do, we should exit with an error, so that any sane - // monitoring system can pick it up and restart the server, and hopefully the master-server too. - info!("Assuming this server should never go down, shutting down is considered an error, but something forced this server to shut down (probably an error)"); - Err(anyhow::format_err!("Unexpected server shutdown")) - } - - async fn accept_client( - &self, - acceptor: &TcpListener, - master_send_sender: mpsc::Sender, - keys: Arc, - clients: Arc>>>, - stale_conn_manager: StaleConnectionManager, - id_gen: &Arc>, - ) -> Result<(), Box> { - let (mut client, _) = match tokio::time::timeout( - std::time::Duration::from_secs(1), - acceptor.accept(), - ) - .await - { - Ok(res) => res?, - Err(_) => return Ok(()), - }; - - let client_id = id_gen.lock().await.next_id(); - info!( - "Accepted client connection id={}, endpoint={:?}", - client_id, - client.peer_addr() - ); - // these are messages from master to client - let (client_sender, client_receiver) = - mpsc::channel::(self.config.clients.channel_capacity); - { - clients - .lock() - .await - .insert(client_id, client_sender.clone()); - } - tokio::spawn({ - let master_send_sender = master_send_sender.clone(); - let keys = keys.clone(); - let clients = clients.clone(); - let id_gen = id_gen.clone(); - let client_sender = client_sender.clone(); - async move { - handle_client( - client, - master_send_sender, - client_receiver, - keys, - clients, - stale_conn_manager, - client_id, - id_gen, - client_sender, - ) - .await - } - }); - Ok(()) - } -} - -async fn handle_socket_duplex_master( - socket: TcpStream, - mut send_receiver: mpsc::Receiver, - recv_sender: mpsc::Sender, - mut removed_receiver: mpsc::Receiver>, -) -> Result<(), Box> { - let (read, mut write) = socket.into_split(); - let mut read = net::new_framed_reader(read); - loop { - tokio::select! { - removed_client_ids = removed_receiver.recv() => { - if let Some(client_ids) = removed_client_ids { - for client_id in client_ids { - let packet = TaggedPacket::Failure { client_id, error: "Client disconnected".to_string() }; - if let Err(e) = net::send_tagged_packet(&mut write, packet).await { - error!("Master: Error sending message for client {}: {}", client_id, e); - return Err(e.into()); - } - } - } else { - error!("Master: Error receiving removed client IDs"); - return Err(anyhow::format_err!("Error receiving removed client IDs").into()); - } - } - packet = net::recv_tagged_packet(&mut read) => { - match packet { - Ok(packet) => { - if let TaggedPacket::Data { client_id, .. } = packet { - if let Err(e) = recv_sender.send(Msg::Data(packet.clone())).await { - panic!("Master: Error sending message for client {}: {}", client_id, e); - } - } else { - error!("Master: Unexpected packet type from client {}: {:?}", packet.client_id(), packet); - continue; - } - } - Err(e) => { - error!("Master: Error receiving packet for client: {}", e); - return Err(e.into()); - } - } - } - Some(msg) = send_receiver.recv() => { - match msg { - Msg::Data(packet) => { - if let Err(e) = net::send_tagged_packet(&mut write, packet.clone()).await { - panic!("Master: Error sending message for client {}: {}", packet.client_id(), e); - } - } - Msg::Stop => { - info!("Stopping master duplex"); - break; - } - } - } - } - } - Ok(()) -} - -pub enum Either { - Left(T), - Right(U), -} - - -async fn master_recv_main_loop( - mut master_recv_receiver: mpsc::Receiver, - master_send_sender: mpsc::Sender, - clients: Arc>>>, - stale_conn_manager: StaleConnectionManager, - config: config::Config, -) -> Result<(), Box> { - // take messages from the master_recv_receiver and send them to each client - loop { - if let Some(msg) = master_recv_receiver.recv().await { - match msg { - Msg::Data(packet) => match packet { - TaggedPacket::Data { client_id, .. } => { - let client = { - let clients = clients.lock().await; - clients.get(&client_id).cloned() - }; - match client { - Some(client_sender) => { - if let Err(e) = client_sender.send(Msg::Data(packet.clone())).await - { - error!("Error sending message to client {}: {}", client_id, e); - } - } - None => { - // Client not found, check if it's a stale client - // in that case we want to enqueue the packet if that's configured. - // if not, we fail the client right here. - if config.clients.missed_packets_buffer_size > 0 - && stale_conn_manager.has_client(client_id).await - { - if let Err(e) = stale_conn_manager - .add_missed_packet( - client_id, - packet, - config.clients.missed_packets_buffer_size, - ) - .await - { - error!( - "Error adding missed packet for client {}: {}, this fails the client", - client_id, e - ); - stale_conn_manager.remove_stale_client(client_id).await; - let packet = TaggedPacket::Failure { - client_id, - error: "Client disconnected".to_string(), - }; - if let Err(e) = - master_send_sender.send(Msg::Data(packet)).await - { - error!( - "Error sending failure message to master: {}", - e - ); - } - } - } else { - error!("Client {} not found, missed packet buffering not enabled, failing client", client_id); - stale_conn_manager.remove_stale_client(client_id).await; - let packet = TaggedPacket::Failure { - client_id, - error: "Client not found".to_string(), - }; - if let Err(e) = master_send_sender.send(Msg::Data(packet)).await - { - error!("Error sending failure message to master: {}", e); - } - } - } - } - } - _ => { - error!("Unexpected packet type from master: {:?}", packet); - continue; - } - }, - Msg::Stop => { - info!("Stopping master recv"); - break Ok(()); - } - } - } - } -} - -async fn handle_client( - client: TcpStream, - master_send_sender: mpsc::Sender, - client_receiver: mpsc::Receiver, - keys: Arc, - clients: Arc>>>, - stale_conn_manager: StaleConnectionManager, - mut client_id: u64, - id_gen: Arc>, - send_sender: mpsc::Sender, -) { - let mut client = Client::new(client, client_receiver, master_send_sender, client_id); - let client_id_before = client_id; - let encryption = match client - .try_reconnect(&keys, client_id, &stale_conn_manager) - .await - { - Ok(Either::Left((enc, new_id))) => { - client_id = new_id; - info!("Client {} connected", client_id); - enc - } - Ok(Either::Right(stale_client)) => { - info!("Client {} reconnected", client_id); - for packet in stale_client.missed_packets { - send_sender.send(Msg::Data(packet)).await.unwrap(); - } - stale_client.encryption - } - Err(e) => { - error!("Error handling client {}: {}", client_id, e); - return; - } - }; - if client_id != client_id_before { - info!( - "Client ID changed from {} to {} due to reconnect", - client_id_before, client_id - ); - let mut clients = clients.lock().await; - let client_sender = match clients.remove(&client_id_before) { - Some(sender) => sender, - None => { - error!( - "Client ID {} not found in clients, but must exist", - client_id_before - ); - return; - } - }; - clients.insert(client_id, client_sender); - id_gen.lock().await.release_id(client_id_before); - } - - if let Err(e) = client.run().await { - error!( - "Error handling client duplex for client {}: {}", - client_id, e - ); - stale_conn_manager - .add_stale_client(client_id, encryption, std::time::Instant::now()) - .await; - let mut clients = clients.lock().await; - clients.remove(&client_id); - } -} +mod master; #[tokio::main(flavor = "multi_thread")] async fn main() { @@ -447,8 +40,15 @@ async fn main() { config.master.address = master_addr; } + let master = match master::TcpMasterDuplex::connect(&config.master.address).await { + Ok(master) => master, + Err(e) => { + error!("Error connecting to master: {}", e); + std::process::exit(1); + } + }; let mut server = ConnServer::new(config).await; - if let Err(err) = server.run().await { + if let Err(err) = server.run(master).await { error!("Error: {}", err); std::process::exit(1); } diff --git a/connserver/src/master.rs b/connserver/src/master.rs new file mode 100644 index 0000000..cb3e3fa --- /dev/null +++ b/connserver/src/master.rs @@ -0,0 +1,41 @@ +use net::TaggedPacket; +use tokio::net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, +}; + +#[async_trait::async_trait] +pub trait MasterDuplex: Send + Sync { + /// Send a tagged packet to the master. + async fn send_packet(&mut self, packet: TaggedPacket) -> anyhow::Result<()>; + /// Receive a tagged packet from the master. + async fn recv_packet(&mut self) -> anyhow::Result; +} + +pub struct TcpMasterDuplex { + write: OwnedWriteHalf, + read: net::FramedReader, +} + +impl TcpMasterDuplex { + pub async fn connect(address: &str) -> anyhow::Result { + let mut stream = TcpStream::connect(address).await?; + net::configure_performance_tcp_socket(&mut stream)?; + let (read, write) = stream.into_split(); + Ok(Self { + write: write, + read: net::new_framed_reader(read), + }) + } +} + +#[async_trait::async_trait] +impl MasterDuplex for TcpMasterDuplex { + async fn send_packet(&mut self, packet: TaggedPacket) -> anyhow::Result<()> { + net::send_tagged_packet(&mut self.write, packet).await + } + + async fn recv_packet(&mut self) -> anyhow::Result { + net::recv_tagged_packet(&mut self.read).await + } +} From d4323003f4924c76aada56270290ef7cabb4aa62 Mon Sep 17 00:00:00 2001 From: Lion Kortlepel Date: Wed, 25 Jun 2025 00:47:48 +0200 Subject: [PATCH 3/4] feat: add .env parsing and config --- .dockerignore | 8 +++++++ .gitignore | 1 + Cargo.lock | 7 ++++++ Dockerfile | 23 ++++++++++++++++++ Dockerfile.example_echoserver | 27 +++++++++++++++++++++ connserver/Cargo.toml | 1 + connserver/src/config.rs | 42 +++++++++++++++++++++++++++++++++ connserver/src/main.rs | 5 ++++ docker-compose.yml | 20 ++++++++++++++++ examples/echoserver/src/main.rs | 3 ++- mount/example.env | 10 ++++++++ 11 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 Dockerfile.example_echoserver create mode 100644 docker-compose.yml create mode 100644 mount/example.env diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8831f3c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +/target +/mount +.idea/ +.vscode/ +*.swp +*.swo +.DS_Store +Thumbs.db diff --git a/.gitignore b/.gitignore index 88eaa94..fb92f28 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target /examples/target **/connserver.toml +.env diff --git a/Cargo.lock b/Cargo.lock index 4f79ea7..16ef566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -250,6 +250,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "dotenvy", "enc", "env_logger", "futures", @@ -321,6 +322,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "ecdsa" version = "0.16.9" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ec2fb51 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM rust:alpine3.22 AS builder + +WORKDIR /app + +RUN apk add musl-dev + +ADD clientcom ./clientcom +ADD clientcom-c ./clientcom-c +ADD connserver ./connserver +ADD enc ./enc +ADD net ./net +ADD Cargo.toml . +ADD Cargo.lock . + +RUN cargo install --path connserver + +FROM alpine:3.22 + +WORKDIR /voltlane + +COPY --from=builder /usr/local/cargo/bin/connserver /usr/local/bin + +CMD ["connserver"] diff --git a/Dockerfile.example_echoserver b/Dockerfile.example_echoserver new file mode 100644 index 0000000..ba842d1 --- /dev/null +++ b/Dockerfile.example_echoserver @@ -0,0 +1,27 @@ +FROM rust:alpine3.22 AS builder + +RUN apk add musl-dev + +WORKDIR /app +ADD clientcom ./clientcom +ADD clientcom-c ./clientcom-c +ADD connserver ./connserver +ADD examples ./examples +ADD enc ./enc +ADD net ./net +ADD Cargo.toml . +ADD Cargo.lock . + +WORKDIR /app/examples/echoserver + +RUN cargo install --path . + +FROM alpine:3.22 + +WORKDIR /voltlane + +COPY --from=builder /usr/local/cargo/bin/echoserver /usr/local/bin + +RUN apk add netcat-openbsd + +CMD ["echoserver"] diff --git a/connserver/Cargo.toml b/connserver/Cargo.toml index 14fbb16..620a025 100644 --- a/connserver/Cargo.toml +++ b/connserver/Cargo.toml @@ -15,6 +15,7 @@ anyhow = "1.0.98" serde = { version = "1.0.219", features = ["derive"] } toml = "0.8.20" async-trait = "0.1.88" +dotenvy = "0.15.7" [profile.release-with-debug] inherits = "release" diff --git a/connserver/src/config.rs b/connserver/src/config.rs index f316be3..568ebdd 100644 --- a/connserver/src/config.rs +++ b/connserver/src/config.rs @@ -105,4 +105,46 @@ impl Config { Ok(config) } } + + pub fn fill_from_env(&mut self) -> anyhow::Result<()> { + if let Ok(log_level) = std::env::var("VOLTLANE_LOG_LEVEL") { + self.general.log_level = Some(match log_level.to_lowercase().as_str() { + "off" => LogLevel::Off, + "error" => LogLevel::Error, + "warn" => LogLevel::Warn, + "info" => LogLevel::Info, + "debug" => LogLevel::Debug, + "trace" => LogLevel::Trace, + _ => return Err(anyhow::anyhow!("Invalid log level: {}", log_level)), + }); + } + if let Ok(address) = std::env::var("VOLTLANE_MASTER_ADDRESS") { + self.master.address = address; + } + if let Ok(address) = std::env::var("VOLTLANE_LISTENER_ADDRESS") { + self.listener.address = address; + } + if let Ok(channel_capacity) = std::env::var("VOLTLANE_MASTER_CHANNEL_CAPACITY") { + self.master.channel_capacity = channel_capacity.parse()?; + } + if let Ok(address) = std::env::var("VOLTLANE_MASTER_ADDRESS") { + self.master.address = address; + } + if let Ok(channel_capacity) = std::env::var("VOLTLANE_CLIENTS_CHANNEL_CAPACITY") { + self.clients.channel_capacity = channel_capacity.parse()?; + } + if let Ok(stale_timeout_secs) = std::env::var("VOLTLANE_CLIENTS_STALE_TIMEOUT_SECS") { + self.clients.stale_timeout_secs = stale_timeout_secs.parse()?; + } + if let Ok(max_stale_clients) = std::env::var("VOLTLANE_CLIENTS_MAX_STALE_CLIENTS") { + self.clients.max_stale_clients = max_stale_clients.parse()?; + } + if let Ok(stale_reap_interval_secs) = std::env::var("VOLTLANE_CLIENTS_STALE_REAP_INTERVAL_SECS") { + self.clients.stale_reap_interval_secs = stale_reap_interval_secs.parse()?; + } + if let Ok(missed_packets_buffer_size) = std::env::var("VOLTLANE_CLIENTS_MISSED_PACKETS_BUFFER_SIZE") { + self.clients.missed_packets_buffer_size = missed_packets_buffer_size.parse()?; + } + Ok(()) + } } diff --git a/connserver/src/main.rs b/connserver/src/main.rs index 5783423..8a3eb9d 100644 --- a/connserver/src/main.rs +++ b/connserver/src/main.rs @@ -10,6 +10,7 @@ mod master; #[tokio::main(flavor = "multi_thread")] async fn main() { + _ = dotenvy::dotenv(); env_logger::builder() .filter_level(log::LevelFilter::max()) .format_timestamp(None) @@ -30,6 +31,10 @@ async fn main() { std::process::exit(1); } }; + if let Err(e) = config.fill_from_env() { + error!("Error filling config from environment: {}", e); + std::process::exit(1); + } if let Some(listen_addr) = args.listen_addr { info!("Overriding listener.address from commandline"); diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..a4f212b --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +services: + voltlane-connserver: + build: . + image: voltlane.net/connserver + ports: + - "127.0.0.1:42000:42000" + volumes: + # add this if you want to supply your own config (it must exist) + - ./mount:/voltlane + restart: unless-stopped + stop_signal: SIGKILL + + #replace this service with your own server application + backend-server: + build: + context: . + dockerfile: Dockerfile.example_echoserver + image: voltlane.net/echoserver + restart: unless-stopped + stop_signal: SIGKILL diff --git a/examples/echoserver/src/main.rs b/examples/echoserver/src/main.rs index 76f7312..a1431ed 100644 --- a/examples/echoserver/src/main.rs +++ b/examples/echoserver/src/main.rs @@ -4,7 +4,8 @@ use tokio::net::TcpListener; #[tokio::main] async fn main() -> anyhow::Result<()> { - let listener = TcpListener::bind("127.0.0.1:42001").await?; + let listener = TcpListener::bind("0.0.0.0:42001").await?; + println!("Server listening on {}", listener.local_addr()?); loop { let (mut socket, _) = listener.accept().await?; net::configure_performance_tcp_socket(&mut socket)?; diff --git a/mount/example.env b/mount/example.env new file mode 100644 index 0000000..2fd59ff --- /dev/null +++ b/mount/example.env @@ -0,0 +1,10 @@ +#VOLTLANE_LOG_LEVEL= +#VOLTLANE_MASTER_ADDRESS= +#VOLTLANE_LISTENER_ADDRESS= +#VOLTLANE_MASTER_CHANNEL_CAPACITY= +#VOLTLANE_MASTER_ADDRESS= +#VOLTLANE_CLIENTS_CHANNEL_CAPACITY= +#VOLTLANE_CLIENTS_STALE_TIMEOUT_SECS= +#VOLTLANE_CLIENTS_MAX_STALE_CLIENTS= +#VOLTLANE_CLIENTS_STALE_REAP_INTERVAL_SECS= +#VOLTLANE_CLIENTS_MISSED_PACKETS_BUFFER_SIZE= From 4668d177cd59ef86d4b3d53041e488abbef4645c Mon Sep 17 00:00:00 2001 From: Lion Kortlepel Date: Wed, 25 Jun 2025 00:50:06 +0200 Subject: [PATCH 4/4] fix: specify and read VOLTLANE_MASTER_ADDRESS only once oops! --- connserver/src/config.rs | 3 --- mount/example.env | 1 - 2 files changed, 4 deletions(-) diff --git a/connserver/src/config.rs b/connserver/src/config.rs index 568ebdd..9c3e735 100644 --- a/connserver/src/config.rs +++ b/connserver/src/config.rs @@ -127,9 +127,6 @@ impl Config { if let Ok(channel_capacity) = std::env::var("VOLTLANE_MASTER_CHANNEL_CAPACITY") { self.master.channel_capacity = channel_capacity.parse()?; } - if let Ok(address) = std::env::var("VOLTLANE_MASTER_ADDRESS") { - self.master.address = address; - } if let Ok(channel_capacity) = std::env::var("VOLTLANE_CLIENTS_CHANNEL_CAPACITY") { self.clients.channel_capacity = channel_capacity.parse()?; } diff --git a/mount/example.env b/mount/example.env index 2fd59ff..6672605 100644 --- a/mount/example.env +++ b/mount/example.env @@ -2,7 +2,6 @@ #VOLTLANE_MASTER_ADDRESS= #VOLTLANE_LISTENER_ADDRESS= #VOLTLANE_MASTER_CHANNEL_CAPACITY= -#VOLTLANE_MASTER_ADDRESS= #VOLTLANE_CLIENTS_CHANNEL_CAPACITY= #VOLTLANE_CLIENTS_STALE_TIMEOUT_SECS= #VOLTLANE_CLIENTS_MAX_STALE_CLIENTS=