From e908971d8570685dec5045ba4fb38b058fe6971f Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Sun, 15 Mar 2026 17:13:14 +0100 Subject: [PATCH 1/3] fix: improve Rust quality and expose more things through Adaptors Signed-off-by: Jeremy HERGAULT --- Cargo.toml | 1 + examples/server.rs | 59 ++++++++++++++------------- src/client/adaptor.rs | 19 ++++++--- src/lib.rs | 40 ++++++++++++++++++- src/server.rs | 28 ++++++------- src/server/adaptor.rs | 93 +++++++++++++++++++++++++++++++++---------- src/server/proc.rs | 2 +- src/server/service.rs | 60 ++++++++-------------------- src/tests.rs | 13 +++--- 9 files changed, 194 insertions(+), 121 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ebfbe20..ad410c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ url = { version = "2", features = ["serde"] } opentelemetry = { version = "0.31", features = ["metrics", "trace", "logs"] } hyper = { version = "1", features = ["full"] } +http = "1" http-body-util = "0.1" hyper-util = { version = "0.1", features = ["full"] } diff --git a/examples/server.rs b/examples/server.rs index 466a38f..0a0878f 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -16,9 +16,9 @@ use prosa::core::settings::settings; use prosa::stub::adaptor::StubParotAdaptor; use prosa::stub::proc::StubSettings; use prosa::{core::main::MainProc, stub::proc::StubProc}; -use prosa_hyper::HyperResp; use prosa_hyper::server::adaptor::HyperServerAdaptor; use prosa_hyper::server::proc::{HyperServerProc, HyperServerSettings}; +use prosa_hyper::{HttpError, HyperResp}; use prosa_utils::config::tracing::TelemetryFilter; use prosa_utils::msg::simple_string_tvf::SimpleStringTvf; use serde::{Deserialize, Serialize}; @@ -52,43 +52,42 @@ where req: Request, ) -> crate::HyperResp { match req.uri().path() { - "/" => HyperResp::HttpResp( - Response::builder() - .header( - "Server", - >::SERVER_HEADER, - ) - .body(BoxBody::new(Full::new(Bytes::from(format!( - "{} - Home of {}", - if req.version() == hyper::Version::HTTP_2 { - "H2" - } else { - "HTTP/1.1" - }, - self.prosa_name, - ))))) - .unwrap(), - ), + "/" => Response::builder() + .header( + "Server", + >::SERVER_HEADER, + ) + .body(BoxBody::new(Full::new(Bytes::from(format!( + "{} - Home of {}", + if req.version() == hyper::Version::HTTP_2 { + "H2" + } else { + "HTTP/1.1" + }, + self.prosa_name, + ))))) + .into(), "/test" => { let mut tvf_req = M::default(); tvf_req.put_string(1, req.method().to_string()); tvf_req.put_string(2, "/test"); HyperResp::SrvReq(String::from("SRV_TEST"), tvf_req) } - _ => HyperResp::HttpResp( - Response::builder() - .status(404) - .header( - "Server", - >::SERVER_HEADER, - ) - .body(BoxBody::new(Full::new(Bytes::from("Not Found")))) - .unwrap(), - ), + _ => Response::builder() + .status(404) + .header( + "Server", + >::SERVER_HEADER, + ) + .body(BoxBody::new(Full::new(Bytes::from("Not Found")))) + .into(), } } - fn process_srv_response(&self, resp: M) -> Response> { + fn process_srv_response( + &self, + resp: M, + ) -> Result>, HttpError> { let body = resp .get_string(10) .unwrap_or(Cow::Owned(String::from("empty body"))); @@ -96,7 +95,7 @@ where .body(BoxBody::new(Full::new(Bytes::from(format!( "Body: {body}\nTvfResp: {resp:?}" ))))) - .unwrap() + .map_err(|e| e.into()) } } diff --git a/src/client/adaptor.rs b/src/client/adaptor.rs index 1b2c9bb..9a38d05 100644 --- a/src/client/adaptor.rs +++ b/src/client/adaptor.rs @@ -32,13 +32,22 @@ where { /// User-Agent header value sent by the client #[cfg(target_family = "unix")] - const USER_AGENT_HEADER: &'static str = - concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION"), " (Unix)"); + const USER_AGENT_HEADER: &'static str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " (Unix)" + ); #[cfg(target_family = "windows")] - const USER_AGENT_HEADER: &'static str = - concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION"), " (Windows)"); + const USER_AGENT_HEADER: &'static str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " (Windows)" + ); #[cfg(all(not(target_family = "unix"), not(target_family = "windows")))] - const USER_AGENT_HEADER: &'static str = concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION")); + const USER_AGENT_HEADER: &'static str = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); /// Create a new adaptor fn new(proc: &HyperClientProc) -> Result> diff --git a/src/lib.rs b/src/lib.rs index 851d354..038d88e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,6 +70,17 @@ where } } +/// Error related to an HTTP message or Hyper error +#[derive(Debug, Error)] +pub enum HttpError { + /// Hyper Error + #[error("Hyper error: `{0}`")] + Hyper(#[from] hyper::Error), + /// HTTP Error + #[error("HTTP error: `{0}`")] + Http(#[from] http::Error), +} + /// Method to get a string version of the Hyper Version object fn hyper_version_str(version: Version) -> &'static str { match version { @@ -88,7 +99,34 @@ pub enum HyperResp { /// Make a direct HTTP response HttpResp(Response>), /// Response with an HTTP error - HttpErr(hyper::Error), + HttpErr(HttpError), +} + +impl From for HyperResp { + fn from(err: HttpError) -> Self { + Self::HttpErr(err) + } +} + +impl From for HyperResp { + fn from(err: hyper::Error) -> Self { + Self::HttpErr(HttpError::Hyper(err)) + } +} + +impl From for HyperResp { + fn from(err: http::Error) -> Self { + Self::HttpErr(HttpError::Http(err)) + } +} + +impl From>, http::Error>> for HyperResp { + fn from(res: Result>, http::Error>) -> Self { + match res { + Ok(response) => Self::HttpResp(response), + Err(err) => Self::HttpErr(HttpError::Http(err)), + } + } } #[cfg(feature = "server")] diff --git a/src/server.rs b/src/server.rs index b3543bf..b576785 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,7 +11,7 @@ pub(crate) mod service; mod tests { use bytes::Bytes; use http_body_util::{Full, combinators::BoxBody}; - use hyper::{Request, Response, StatusCode}; + use hyper::{Request, StatusCode}; use prosa::core::{ adaptor::Adaptor, error::ProcError, @@ -33,7 +33,7 @@ mod tests { use url::Url; use crate::{ - HyperResp, + HttpError, HyperResp, server::{adaptor::HyperServerAdaptor, proc::HyperServerProc}, tests::HttpTestSettings, }; @@ -71,23 +71,19 @@ mod tests { } else { "Hello, world" }; - let response = Response::builder() - .header( - hyper::header::SERVER, - >::SERVER_HEADER, - ) - .status(StatusCode::OK) + >::response_builder(self, StatusCode::OK) .body(BoxBody::new(Full::new(Bytes::from(resp_msg)))) - .unwrap(); - - HyperResp::HttpResp(response) + .into() } fn process_srv_response( &self, _resp: M, - ) -> hyper::Response< - http_body_util::combinators::BoxBody, + ) -> Result< + hyper::Response< + http_body_util::combinators::BoxBody, + >, + HttpError, > { unimplemented!() } @@ -133,7 +129,11 @@ mod tests { .expect("Failed to send request"); assert_eq!(resp.status(), StatusCode::OK); let server_header = resp.headers().get(hyper::header::SERVER).unwrap(); - assert!(server_header.to_str().unwrap().starts_with("ProSA-Hyper/")); + assert!(server_header.to_str().unwrap().starts_with(concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION") + ))); } bus.stop("ProSA HTTP client server unit test end".into()) diff --git a/src/server/adaptor.rs b/src/server/adaptor.rs index 25b28b9..d9a23e7 100644 --- a/src/server/adaptor.rs +++ b/src/server/adaptor.rs @@ -2,11 +2,12 @@ use std::convert::Infallible; use bytes::Bytes; -use http_body_util::{Full, combinators::BoxBody}; -use hyper::{Request, Response}; -use prosa::core::{adaptor::Adaptor, error::ProcError, proc::ProcBusParam as _}; +use http::response; +use http_body_util::{Empty, Full, combinators::BoxBody}; +use hyper::{Request, Response, StatusCode}; +use prosa::core::{adaptor::Adaptor, error::ProcError, msg::ErrorMsg, proc::ProcBusParam as _}; -use crate::HyperResp; +use crate::{HttpError, HyperResp}; use super::proc::HyperServerProc; @@ -34,19 +35,40 @@ where { /// Server header value send by the server #[cfg(target_family = "unix")] - const SERVER_HEADER: &'static str = - concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION"), " (Unix)"); + const SERVER_HEADER: &'static str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " (Unix)" + ); #[cfg(target_family = "windows")] - const SERVER_HEADER: &'static str = - concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION"), " (Windows)"); + const SERVER_HEADER: &'static str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " (Windows)" + ); #[cfg(all(not(target_family = "unix"), not(target_family = "windows")))] - const SERVER_HEADER: &'static str = concat!("ProSA-Hyper/", env!("CARGO_PKG_VERSION")); + const SERVER_HEADER: &'static str = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); /// Create a new adaptor fn new(proc: &HyperServerProc) -> Result> where Self: Sized; + /// Initiate a response builder with specific status code and headers from adaptor. + /// The response with its header generated will be used by the processor in case of error + fn response_builder(&self, status_code: T) -> response::Builder + where + T: TryInto, + >::Error: Into, + { + Response::builder() + .status(status_code) + .header(hyper::header::SERVER, Self::SERVER_HEADER) + } + /// Method to process input HTTP requests. Received by the ProSA through Hyper fn process_http_request( &self, @@ -54,7 +76,35 @@ where ) -> impl std::future::Future> + Send; /// Method to process input response to respond with an Hyper HTTP response. - fn process_srv_response(&self, resp: M) -> Response>; + fn process_srv_response( + &self, + resp: M, + ) -> Result>, HttpError>; + + /// Method to process input service error to respond with an Hyper HTTP response. + fn process_srv_error( + &self, + err: ErrorMsg, + ) -> Result>, HttpError> { + match err.get_err() { + prosa::core::service::ServiceError::NoError(_) => self + .response_builder(StatusCode::ACCEPTED) + .body(BoxBody::new(Empty::::new())) + .map_err(|e| e.into()), + prosa::core::service::ServiceError::UnableToReachService(_) => self + .response_builder(StatusCode::SERVICE_UNAVAILABLE) + .body(BoxBody::new(Full::new(Bytes::from("Can't reach service")))) + .map_err(|e| e.into()), + prosa::core::service::ServiceError::Timeout(_, _) => self + .response_builder(StatusCode::GATEWAY_TIMEOUT) + .body(BoxBody::new(Empty::::new())) + .map_err(|e| e.into()), + prosa::core::service::ServiceError::ProtocolError(_) => self + .response_builder(StatusCode::BAD_GATEWAY) + .body(BoxBody::new(Empty::::new())) + .map_err(|e| e.into()), + } + } } /// Hello adaptor for the Hyper server processor. Use to respond to a request with a simple hello message @@ -81,19 +131,20 @@ where } async fn process_http_request(&self, _req: Request) -> HyperResp { - HyperResp::::HttpResp( - Response::builder() - .status(200) - .header( - "Server", - >::SERVER_HEADER, - ) - .body(BoxBody::new(Full::new(Bytes::from(self.hello_msg.clone())))) - .unwrap(), - ) + Response::builder() + .status(200) + .header( + "Server", + >::SERVER_HEADER, + ) + .body(BoxBody::new(Full::new(Bytes::from(self.hello_msg.clone())))) + .into() } - fn process_srv_response(&self, _resp: M) -> Response> { + fn process_srv_response( + &self, + _resp: M, + ) -> Result>, HttpError> { panic!("No message should be send to an external service") } } diff --git a/src/server/proc.rs b/src/server/proc.rs index e3da4db..8b034a2 100644 --- a/src/server/proc.rs +++ b/src/server/proc.rs @@ -164,7 +164,7 @@ where { let request = RequestMsg::new(http_msg.get_service().clone(), http_msg_data, self.proc.get_service_queue().clone()); let request_id = request.get_id(); - service.proc_queue.send(InternalMsg::Request(request)).await.unwrap(); + service.proc_queue.send(InternalMsg::Request(request)).await?; pending_req.push_with_id(request_id, http_msg, self.settings.service_timeout); } else { warn!( diff --git a/src/server/service.rs b/src/server/service.rs index 7702de8..daaaaf7 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -6,6 +6,7 @@ use std::pin::Pin; use std::sync::Arc; use bytes::Bytes; +use http::StatusCode; use http_body_util::combinators::BoxBody; use http_body_util::{Empty, Full}; use hyper::service::Service; @@ -15,7 +16,7 @@ use opentelemetry::metrics::Counter; use prosa::core::msg::{InternalMsg, Msg, RequestMsg}; use tokio::sync::{mpsc, oneshot}; -use crate::hyper_version_str; +use crate::{HttpError, hyper_version_str}; use super::adaptor::HyperServerAdaptor; @@ -68,7 +69,7 @@ where proc_queue: mpsc::Sender>, req: Request, metric_counter: Counter, - ) -> Result>, hyper::Error> { + ) -> Result>, HttpError> { match adaptor.process_http_request(req).await { crate::HyperResp::SrvReq(srv_name, req) => { let resp = @@ -108,7 +109,7 @@ where proc_queue: mpsc::Sender>, service_name: String, request: M, - ) -> Result>, hyper::Error> { + ) -> Result>, HttpError> { let (resp_tx, resp_rx) = oneshot::channel::>(); let _ = proc_queue .send(RequestMsg::new(service_name, request, resp_tx)) @@ -118,52 +119,23 @@ where Ok(msg) => match msg { InternalMsg::Response(mut msg) => { if let Some(data) = msg.take_data() { - Ok(adaptor.process_srv_response(data)) + adaptor.process_srv_response(data) } else { - Ok(Response::builder() - .status(500) - .header("Server", A::SERVER_HEADER) - .body(BoxBody::new(Full::new(Bytes::from("Missing data")))) - .unwrap()) + Ok(adaptor + .response_builder(StatusCode::INTERNAL_SERVER_ERROR) + .body(BoxBody::new(Empty::::new()))?) } } - InternalMsg::Error(err) => match err.get_err() { - prosa::core::service::ServiceError::NoError(_) => Ok(Response::builder() - .status(202) - .header("Server", A::SERVER_HEADER) - .body(BoxBody::new(Empty::::new())) - .unwrap()), - prosa::core::service::ServiceError::UnableToReachService(_) => { - Ok(Response::builder() - .status(503) - .header("Server", A::SERVER_HEADER) - .body(BoxBody::new(Full::new(Bytes::from("Can't reach service")))) - .unwrap()) - } - prosa::core::service::ServiceError::Timeout(_, _) => Ok(Response::builder() - .status(504) - .header("Server", A::SERVER_HEADER) - .body(BoxBody::new(Empty::::new())) - .unwrap()), - prosa::core::service::ServiceError::ProtocolError(_) => Ok(Response::builder() - .status(502) - .header("Server", A::SERVER_HEADER) - .body(BoxBody::new(Empty::::new())) - .unwrap()), - }, - _ => Ok(Response::builder() - .status(500) - .header("Server", A::SERVER_HEADER) - .body(BoxBody::new(Full::new(Bytes::from("Server error")))) - .unwrap()), + InternalMsg::Error(err) => adaptor.process_srv_error(err), + _ => Ok(adaptor + .response_builder(StatusCode::INTERNAL_SERVER_ERROR) + .body(BoxBody::new(Empty::::new()))?), }, - Err(_) => Ok(Response::builder() - .status(503) - .header("Server", A::SERVER_HEADER) + Err(_) => Ok(adaptor + .response_builder(StatusCode::SERVICE_UNAVAILABLE) .body(BoxBody::new(Full::new(Bytes::from( "Can't handle your request for now", - )))) - .unwrap()), + ))))?), } } } @@ -181,7 +153,7 @@ where + std::marker::Sync, { type Response = Response>; - type Error = hyper::Error; + type Error = HttpError; type Future = Pin> + Send>>; fn call(&self, req: Request) -> Self::Future { diff --git a/src/tests.rs b/src/tests.rs index 2b9fff0..b2aff76 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -157,7 +157,7 @@ mod tests { use url::Url; use crate::{ - HyperResp, + HttpError, HyperResp, client::{adaptor::HyperClientAdaptor, proc::HyperClientProc}, server::{adaptor::HyperServerAdaptor, proc::HyperServerProc}, tests::HttpTestSettings, @@ -271,8 +271,11 @@ mod tests { fn process_srv_response( &self, resp: M, - ) -> hyper::Response< - http_body_util::combinators::BoxBody, + ) -> Result< + hyper::Response< + http_body_util::combinators::BoxBody, + >, + HttpError, > { if let Ok(content) = resp.get_string(1) { Response::builder() @@ -284,7 +287,7 @@ mod tests { .body(BoxBody::new(Full::new(Bytes::from_owner( content.into_owned(), )))) - .unwrap() + .map_err(|e| e.into()) } else { Response::builder() .header( @@ -293,7 +296,7 @@ mod tests { ) .status(StatusCode::BAD_REQUEST) .body(BoxBody::new(Full::new(Bytes::from("Bad Request")))) - .unwrap() + .map_err(|e| e.into()) } } } From 9a120a5eb339a2ec1dda11cf0824c64979c36cec Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Tue, 17 Mar 2026 10:19:20 +0100 Subject: [PATCH 2/3] feat: simplify according to code review Signed-off-by: Jeremy HERGAULT --- Cargo.toml | 2 +- examples/client.rs | 3 +- examples/server.rs | 6 +- src/client/adaptor.rs | 19 ---- src/client/socket.rs | 230 ++++++++++++++++++++++++++---------------- src/lib.rs | 19 ++++ src/server/adaptor.rs | 25 +---- src/tests.rs | 11 +- 8 files changed, 176 insertions(+), 139 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ad410c2..24b2ddb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prosa-hyper" -version = "0.3.1" +version = "0.3.0" authors = ["Jérémy HERGAULT ", "Anthony THOMAS ", "Julien TERUEL ", "Rene-Louis EYMARD "] description = "ProSA Hyper processor for HTTP client/server" homepage = "https://worldline.com/" diff --git a/examples/client.rs b/examples/client.rs index c9c49b6..f77f2a1 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -16,6 +16,7 @@ use prosa::core::service::ServiceError; use prosa::core::settings::settings; use prosa::inj::adaptor::InjAdaptor; use prosa::inj::proc::{InjProc, InjSettings}; +use prosa_hyper::PRODUCT_VERSION_HEADER; use prosa_hyper::client::adaptor::HyperClientAdaptor; use prosa_hyper::client::proc::{HyperClientProc, HyperClientSettings}; use prosa_utils::config::tracing::TelemetryFilter; @@ -67,7 +68,7 @@ where .uri(uri.as_str()) .header( "User-Agent", - >::USER_AGENT_HEADER, + PRODUCT_VERSION_HEADER, ) .body(BoxBody::new(Empty::::new())) .map_err(|e| ServiceError::ProtocolError(format!("Failed to build request: {}", e))) diff --git a/examples/server.rs b/examples/server.rs index 0a0878f..74e8dca 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -18,7 +18,7 @@ use prosa::stub::proc::StubSettings; use prosa::{core::main::MainProc, stub::proc::StubProc}; use prosa_hyper::server::adaptor::HyperServerAdaptor; use prosa_hyper::server::proc::{HyperServerProc, HyperServerSettings}; -use prosa_hyper::{HttpError, HyperResp}; +use prosa_hyper::{HttpError, HyperResp, PRODUCT_VERSION_HEADER}; use prosa_utils::config::tracing::TelemetryFilter; use prosa_utils::msg::simple_string_tvf::SimpleStringTvf; use serde::{Deserialize, Serialize}; @@ -55,7 +55,7 @@ where "/" => Response::builder() .header( "Server", - >::SERVER_HEADER, + PRODUCT_VERSION_HEADER, ) .body(BoxBody::new(Full::new(Bytes::from(format!( "{} - Home of {}", @@ -77,7 +77,7 @@ where .status(404) .header( "Server", - >::SERVER_HEADER, + PRODUCT_VERSION_HEADER, ) .body(BoxBody::new(Full::new(Bytes::from("Not Found")))) .into(), diff --git a/src/client/adaptor.rs b/src/client/adaptor.rs index 9a38d05..887f857 100644 --- a/src/client/adaptor.rs +++ b/src/client/adaptor.rs @@ -30,25 +30,6 @@ where + prosa::core::msg::Tvf + std::default::Default, { - /// User-Agent header value sent by the client - #[cfg(target_family = "unix")] - const USER_AGENT_HEADER: &'static str = concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), - " (Unix)" - ); - #[cfg(target_family = "windows")] - const USER_AGENT_HEADER: &'static str = concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), - " (Windows)" - ); - #[cfg(all(not(target_family = "unix"), not(target_family = "windows")))] - const USER_AGENT_HEADER: &'static str = - concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); - /// Create a new adaptor fn new(proc: &HyperClientProc) -> Result> where diff --git a/src/client/socket.rs b/src/client/socket.rs index 369738a..ebffb4c 100644 --- a/src/client/socket.rs +++ b/src/client/socket.rs @@ -83,6 +83,97 @@ impl HyperClientSocket { } } + /// Helper to setup socket queue and service registration + async fn setup_socket_queue( + proc: &Arc>, + socket_id: u32, + service_name: &str, + ) -> Result>, HyperProcError> + where + M: 'static + + std::marker::Send + + std::marker::Sync + + std::marker::Sized + + std::clone::Clone + + std::fmt::Debug + + prosa::core::msg::Tvf + + std::default::Default, + { + let (tx_queue, rx_queue) = tokio::sync::mpsc::channel(2048); + proc.add_proc_queue(tx_queue, socket_id).await?; + proc.add_service(vec![service_name.to_string()], socket_id) + .await?; + Ok(rx_queue) + } + + /// Helper to process a service request into an HTTP request + fn process_request( + adaptor: &Arc, + mut msg: RequestMsg, + target_url: &url::Url, + service_name: &str, + ) -> Option<(RequestMsg, Request>)> + where + M: 'static + + std::marker::Send + + std::marker::Sync + + std::marker::Sized + + std::clone::Clone + + std::fmt::Debug + + prosa::core::msg::Tvf + + std::default::Default, + A: 'static + HyperClientAdaptor + std::marker::Send + std::marker::Sync, + { + if let Some(data) = msg.take_data() { + match adaptor.process_srv_request(data, target_url) { + Ok(http_request) => Some((msg, http_request)), + Err(e) => { + let _ = msg.return_error_to_sender(None, e); + None + } + } + } else { + let _ = msg.return_error_to_sender( + None, + ServiceError::UnableToReachService(service_name.to_string()), + ); + None + } + } + + /// Helper to handle handshake timeout errors + fn handle_handshake_timeout( + socket_id: i32, + target_addr: &str, + timeout_ms: u64, + protocol: &str, + ) -> HyperProcError { + warn!( + socket_id = socket_id, + addr = target_addr, + "{protocol} handshake timeout after {timeout_ms} ms" + ); + HyperProcError::Io(io::Error::new( + io::ErrorKind::TimedOut, + format!("{protocol} handshake timeout after {timeout_ms} ms"), + )) + } + + /// Helper to handle handshake errors + fn handle_handshake_error( + socket_id: i32, + target_addr: &str, + error: hyper::Error, + protocol: &str, + ) -> HyperProcError { + warn!( + socket_id = socket_id, + addr = target_addr, + "{protocol} handshake error: {error}" + ); + HyperProcError::Hyper(error, target_addr.to_string()) + } + /// Method to spawn a task that handle the Hyper client socket with HTTP/1.1 async fn spawn_http1( self, @@ -117,16 +208,14 @@ impl HyperClientSocket { addr = target_addr, "Connected to HTTP1 remote" ); - let (tx_queue, mut rx_queue) = tokio::sync::mpsc::channel(2048); - proc.add_proc_queue(tx_queue, socket_id as u32).await?; + let mut rx_queue = + Self::setup_socket_queue(&proc, socket_id as u32, &service_name).await?; debug!( socket_id = socket_id, addr = target_addr, "HTTP client expose service name: {}", service_name ); - proc.add_service(vec![service_name.clone()], socket_id as u32) - .await?; #[allow(clippy::type_complexity)] let mut msg_to_send: Option<( RequestMsg, @@ -186,33 +275,25 @@ impl HyperClientSocket { Some(msg) = rx_queue.recv() => { debug!(socket_id = socket_id, addr = target_addr, "HTTP client receive a message to send: {:?}", msg); match msg { - InternalMsg::Request(mut msg) => { - if let Some(data) = msg.take_data() { - match adaptor.process_srv_request(data, &self.target.url) { - Ok(http_request) => { - msg_to_send = Some((msg, http_request)); - req_instant = Instant::now(); - }, - Err(e) => { - let _ = msg.return_error_to_sender(None, e); - }, - } - } else { - let _ = msg.return_error_to_sender(None, ServiceError::UnableToReachService(service_name.clone())); + InternalMsg::Request(req_msg) => { + if let Some(result) = Self::process_request(&adaptor, req_msg, &self.target.url, &service_name) { + msg_to_send = Some(result); + req_instant = Instant::now(); } }, InternalMsg::Response(msg) => panic!( - "The hyper client socket {}/{socket_id} receive a response {:?}", + "The HTTP1 hyper client socket {}/{socket_id} receive a response {:?}", proc.get_proc_id(), msg ), InternalMsg::Error(err_msg) => panic!( - "The hyper client socket {}/{socket_id} receive an error {:?}", + "The HTTP1 hyper client socket {}/{socket_id} receive an error {:?}", proc.get_proc_id(), err_msg ), - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Command(_) | InternalMsg::Config => { + // TODO: Implement Command/Config handling or document as unsupported + }, InternalMsg::Service(_table) => {/* Will not use service table */}, InternalMsg::Shutdown => { // Remove the socket queue and wait message to finish @@ -224,30 +305,18 @@ impl HyperClientSocket { } } } - Ok(Err(e)) => { - warn!( - socket_id = socket_id, - addr = target_addr, - "HTTP1 handshake error: {}", - e - ); - Err(HyperProcError::Hyper(e, target_addr)) - } - Err(_) => { - warn!( - socket_id = socket_id, - addr = target_addr, - "HTTP1 handshake timeout after {} ms", - self.target.connect_timeout - ); - Err(HyperProcError::Io(io::Error::new( - io::ErrorKind::TimedOut, - format!( - "HTTP1 handshake timeout after {} ms", - self.target.connect_timeout - ), - ))) - } + Ok(Err(e)) => Err(Self::handle_handshake_error( + socket_id, + &target_addr, + e, + "HTTP1", + )), + Err(_) => Err(Self::handle_handshake_timeout( + socket_id, + &target_addr, + self.target.connect_timeout, + "HTTP1", + )), } } @@ -286,10 +355,8 @@ impl HyperClientSocket { addr = target_addr, "Connected to HTTP2 remote" ); - let (tx_queue, mut rx_queue) = tokio::sync::mpsc::channel(2048); - proc.add_proc_queue(tx_queue, socket_id as u32).await?; - proc.add_service(vec![service_name.clone()], socket_id as u32) - .await?; + let mut rx_queue = + Self::setup_socket_queue(&proc, socket_id as u32, &service_name).await?; loop { tokio::select! { @@ -301,18 +368,20 @@ impl HyperClientSocket { // Receive a message to send from the queue Some(msg) = rx_queue.recv() => { match msg { - InternalMsg::Request(mut msg) => { - if let Some(data) = msg.take_data() { + InternalMsg::Request(mut req_msg) => { + if let Some(data) = req_msg.take_data() { let req_instant = Instant::now(); let mut sender = sender.clone(); let adaptor = adaptor.clone(); let target_url = self.target.url.clone(); - let service_name = service_name.clone(); let message_histogram = message_histogram.clone(); + let http_timeout = self.http_timeout; + let service_name_clone = service_name.clone(); + tokio::spawn(async move { match adaptor.process_srv_request(data, &target_url) { Ok(http_request) => { - match timeout(self.http_timeout, sender.send_request(http_request)).await { + match timeout(http_timeout, sender.send_request(http_request)).await { Ok(http_response) => { let (code, version) = http_response.as_ref().map_or((500, "HTTP/2"), |r| (r.status().as_u16() as i64, hyper_version_str(r.version()))); message_histogram.record( @@ -326,35 +395,36 @@ impl HyperClientSocket { match adaptor.process_http_response(http_response).await { Ok(response) => { - let _ = msg.return_to_sender(response); + let _ = req_msg.return_to_sender(response); }, - Err(e) => { let _ = msg.return_error_to_sender(None, e); }, + Err(e) => { let _ = req_msg.return_error_to_sender(None, e); }, } }, - Err(_) => { let _ = msg.return_error_to_sender(None, ServiceError::Timeout(service_name.clone(), self.http_timeout.as_millis() as u64)); }, + Err(_) => { let _ = req_msg.return_error_to_sender(None, ServiceError::Timeout(service_name_clone, http_timeout.as_millis() as u64)); }, }; }, Err(e) => { - let _ = msg.return_error_to_sender(None, e); + let _ = req_msg.return_error_to_sender(None, e); }, } }); } else { - let _ = msg.return_error_to_sender(None, ServiceError::UnableToReachService(service_name.clone())); + let _ = req_msg.return_error_to_sender(None, ServiceError::UnableToReachService(service_name.clone())); } }, InternalMsg::Response(msg) => panic!( - "The hyper client socket {}/{socket_id} receive a response {:?}", + "The H2 hyper client socket {}/{socket_id} receive a response {:?}", proc.get_proc_id(), msg ), InternalMsg::Error(err_msg) => panic!( - "The hyper client socket {}/{socket_id} receive an error {:?}", + "The H2 hyper client socket {}/{socket_id} receive an error {:?}", proc.get_proc_id(), err_msg ), - InternalMsg::Command(_) => todo!(), - InternalMsg::Config => todo!(), + InternalMsg::Command(_) | InternalMsg::Config => { + // TODO: Implement Command/Config handling or document as unsupported + }, InternalMsg::Service(_table) => {/* Will not use service table */}, InternalMsg::Shutdown => { // Remove the socket queue and wait message to finish @@ -365,30 +435,18 @@ impl HyperClientSocket { } } } - Ok(Err(e)) => { - warn!( - socket_id = socket_id, - addr = target_addr, - "HTTP2 handshake error: {}", - e - ); - Err(HyperProcError::Hyper(e, target_addr)) - } - Err(_) => { - warn!( - socket_id = socket_id, - addr = target_addr, - "HTTP2 handshake timeout after {} ms", - self.target.connect_timeout - ); - Err(HyperProcError::Io(io::Error::new( - io::ErrorKind::TimedOut, - format!( - "HTTP2 handshake timeout after {} ms", - self.target.connect_timeout - ), - ))) - } + Ok(Err(e)) => Err(Self::handle_handshake_error( + socket_id, + &target_addr, + e, + "HTTP2", + )), + Err(_) => Err(Self::handle_handshake_timeout( + socket_id, + &target_addr, + self.target.connect_timeout, + "HTTP2", + )), } } diff --git a/src/lib.rs b/src/lib.rs index 038d88e..bd42ff9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,25 @@ use thiserror::Error; const H2: &[u8] = b"h2"; +/// Product version header value used for `Server` or `User-Agent` header in HTTP requests and responses +#[cfg(target_family = "unix")] +pub const PRODUCT_VERSION_HEADER: &str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " (Unix)" +); +#[cfg(target_family = "windows")] +pub const PRODUCT_VERSION_HEADER: &str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " (Windows)" +); +#[cfg(all(not(target_family = "unix"), not(target_family = "windows")))] +pub const PRODUCT_VERSION_HEADER: &str = + concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); + /// Global Hyper processor error #[derive(Debug, Error)] pub enum HyperProcError { diff --git a/src/server/adaptor.rs b/src/server/adaptor.rs index d9a23e7..0231fd2 100644 --- a/src/server/adaptor.rs +++ b/src/server/adaptor.rs @@ -7,7 +7,7 @@ use http_body_util::{Empty, Full, combinators::BoxBody}; use hyper::{Request, Response, StatusCode}; use prosa::core::{adaptor::Adaptor, error::ProcError, msg::ErrorMsg, proc::ProcBusParam as _}; -use crate::{HttpError, HyperResp}; +use crate::{HttpError, HyperResp, PRODUCT_VERSION_HEADER}; use super::proc::HyperServerProc; @@ -33,25 +33,6 @@ where + prosa::core::msg::Tvf + std::default::Default, { - /// Server header value send by the server - #[cfg(target_family = "unix")] - const SERVER_HEADER: &'static str = concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), - " (Unix)" - ); - #[cfg(target_family = "windows")] - const SERVER_HEADER: &'static str = concat!( - env!("CARGO_PKG_NAME"), - "/", - env!("CARGO_PKG_VERSION"), - " (Windows)" - ); - #[cfg(all(not(target_family = "unix"), not(target_family = "windows")))] - const SERVER_HEADER: &'static str = - concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); - /// Create a new adaptor fn new(proc: &HyperServerProc) -> Result> where @@ -66,7 +47,7 @@ where { Response::builder() .status(status_code) - .header(hyper::header::SERVER, Self::SERVER_HEADER) + .header(hyper::header::SERVER, PRODUCT_VERSION_HEADER) } /// Method to process input HTTP requests. Received by the ProSA through Hyper @@ -135,7 +116,7 @@ where .status(200) .header( "Server", - >::SERVER_HEADER, + PRODUCT_VERSION_HEADER, ) .body(BoxBody::new(Full::new(Bytes::from(self.hello_msg.clone())))) .into() diff --git a/src/tests.rs b/src/tests.rs index b2aff76..492ef94 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -157,10 +157,7 @@ mod tests { use url::Url; use crate::{ - HttpError, HyperResp, - client::{adaptor::HyperClientAdaptor, proc::HyperClientProc}, - server::{adaptor::HyperServerAdaptor, proc::HyperServerProc}, - tests::HttpTestSettings, + HttpError, HyperResp, PRODUCT_VERSION_HEADER, client::{adaptor::HyperClientAdaptor, proc::HyperClientProc}, server::{adaptor::HyperServerAdaptor, proc::HyperServerProc}, tests::HttpTestSettings }; const WAIT_TIME: time::Duration = time::Duration::from_secs(1); @@ -281,7 +278,7 @@ mod tests { Response::builder() .header( hyper::header::SERVER, - >::SERVER_HEADER, + PRODUCT_VERSION_HEADER, ) .status(StatusCode::OK) .body(BoxBody::new(Full::new(Bytes::from_owner( @@ -292,7 +289,7 @@ mod tests { Response::builder() .header( hyper::header::SERVER, - >::SERVER_HEADER, + PRODUCT_VERSION_HEADER, ) .status(StatusCode::BAD_REQUEST) .body(BoxBody::new(Full::new(Bytes::from("Bad Request")))) @@ -398,7 +395,7 @@ mod tests { .uri(socket_url.as_str()) .header( hyper::header::USER_AGENT, - >::USER_AGENT_HEADER, + PRODUCT_VERSION_HEADER, ) .body(BoxBody::new(Full::new(Bytes::from(body.into_owned())))) .map_err(|e| { From 60f24ddb0d14ccae58285055e3f2040e4f7dfb9c Mon Sep 17 00:00:00 2001 From: Jeremy HERGAULT Date: Tue, 17 Mar 2026 10:24:00 +0100 Subject: [PATCH 3/3] fix: fmt and clippy Signed-off-by: Jeremy HERGAULT --- examples/client.rs | 5 +---- examples/server.rs | 10 ++-------- src/client/socket.rs | 11 +++++------ src/server/adaptor.rs | 5 +---- src/tests.rs | 20 +++++++------------- 5 files changed, 16 insertions(+), 35 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index f77f2a1..f49a78d 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -66,10 +66,7 @@ where Request::builder() .uri(uri.as_str()) - .header( - "User-Agent", - PRODUCT_VERSION_HEADER, - ) + .header("User-Agent", PRODUCT_VERSION_HEADER) .body(BoxBody::new(Empty::::new())) .map_err(|e| ServiceError::ProtocolError(format!("Failed to build request: {}", e))) } diff --git a/examples/server.rs b/examples/server.rs index 74e8dca..9ed496a 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -53,10 +53,7 @@ where ) -> crate::HyperResp { match req.uri().path() { "/" => Response::builder() - .header( - "Server", - PRODUCT_VERSION_HEADER, - ) + .header("Server", PRODUCT_VERSION_HEADER) .body(BoxBody::new(Full::new(Bytes::from(format!( "{} - Home of {}", if req.version() == hyper::Version::HTTP_2 { @@ -75,10 +72,7 @@ where } _ => Response::builder() .status(404) - .header( - "Server", - PRODUCT_VERSION_HEADER, - ) + .header("Server", PRODUCT_VERSION_HEADER) .body(BoxBody::new(Full::new(Bytes::from("Not Found")))) .into(), } diff --git a/src/client/socket.rs b/src/client/socket.rs index ebffb4c..753e00e 100644 --- a/src/client/socket.rs +++ b/src/client/socket.rs @@ -35,6 +35,9 @@ use tracing::{debug, info, warn}; use crate::{H2, HyperProcError, client::adaptor::HyperClientAdaptor, hyper_version_str}; +/// Type alias for HTTP request pair to reduce type complexity +type HttpRequestPair = (RequestMsg, Request>); + /// Hyper client socket #[derive(Debug, Clone)] pub struct HyperClientSocket { @@ -112,7 +115,7 @@ impl HyperClientSocket { mut msg: RequestMsg, target_url: &url::Url, service_name: &str, - ) -> Option<(RequestMsg, Request>)> + ) -> Option> where M: 'static + std::marker::Send @@ -216,11 +219,7 @@ impl HyperClientSocket { "HTTP client expose service name: {}", service_name ); - #[allow(clippy::type_complexity)] - let mut msg_to_send: Option<( - RequestMsg, - Request>, - )> = None; + let mut msg_to_send: Option> = None; let mut req_instant = Instant::now(); loop { diff --git a/src/server/adaptor.rs b/src/server/adaptor.rs index 0231fd2..fd4df9a 100644 --- a/src/server/adaptor.rs +++ b/src/server/adaptor.rs @@ -114,10 +114,7 @@ where async fn process_http_request(&self, _req: Request) -> HyperResp { Response::builder() .status(200) - .header( - "Server", - PRODUCT_VERSION_HEADER, - ) + .header("Server", PRODUCT_VERSION_HEADER) .body(BoxBody::new(Full::new(Bytes::from(self.hello_msg.clone())))) .into() } diff --git a/src/tests.rs b/src/tests.rs index 492ef94..f00348b 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -157,7 +157,10 @@ mod tests { use url::Url; use crate::{ - HttpError, HyperResp, PRODUCT_VERSION_HEADER, client::{adaptor::HyperClientAdaptor, proc::HyperClientProc}, server::{adaptor::HyperServerAdaptor, proc::HyperServerProc}, tests::HttpTestSettings + HttpError, HyperResp, PRODUCT_VERSION_HEADER, + client::{adaptor::HyperClientAdaptor, proc::HyperClientProc}, + server::{adaptor::HyperServerAdaptor, proc::HyperServerProc}, + tests::HttpTestSettings, }; const WAIT_TIME: time::Duration = time::Duration::from_secs(1); @@ -276,10 +279,7 @@ mod tests { > { if let Ok(content) = resp.get_string(1) { Response::builder() - .header( - hyper::header::SERVER, - PRODUCT_VERSION_HEADER, - ) + .header(hyper::header::SERVER, PRODUCT_VERSION_HEADER) .status(StatusCode::OK) .body(BoxBody::new(Full::new(Bytes::from_owner( content.into_owned(), @@ -287,10 +287,7 @@ mod tests { .map_err(|e| e.into()) } else { Response::builder() - .header( - hyper::header::SERVER, - PRODUCT_VERSION_HEADER, - ) + .header(hyper::header::SERVER, PRODUCT_VERSION_HEADER) .status(StatusCode::BAD_REQUEST) .body(BoxBody::new(Full::new(Bytes::from("Bad Request")))) .map_err(|e| e.into()) @@ -393,10 +390,7 @@ mod tests { Ok(body) => Request::builder() .method(Method::POST) .uri(socket_url.as_str()) - .header( - hyper::header::USER_AGENT, - PRODUCT_VERSION_HEADER, - ) + .header(hyper::header::USER_AGENT, PRODUCT_VERSION_HEADER) .body(BoxBody::new(Full::new(Bytes::from(body.into_owned())))) .map_err(|e| { ServiceError::ProtocolError(format!("Failed to build request: {}", e))