diff --git a/Cargo.toml b/Cargo.toml index eb99ff8..e417741 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,4 @@ [workspace] members = ["server", "cache", "core"] -default-members = ["server"] \ No newline at end of file +default-members = ["server"] +resolver = "2" \ No newline at end of file diff --git a/README.md b/README.md index 3ad07ac..cebac91 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ OPTIONS: ``` ## Proxy Configuration -``` +```yaml # service configuration service: # ip address @@ -31,6 +31,12 @@ service: # dev mode - will enable only terminal logger dev_mode: true +# ssl configuration (optional) +ssl: + # verify SSL certificates for backend HTTPS connections + # set to false to allow self-signed certificates (not recommended for production) + verify_ssl: true + # inbound paths inbound: # match path to group @@ -47,14 +53,24 @@ outbound: timeout: 60 # backend servers for this group # round robin balancing to all servers + # supports both HTTP and HTTPS backends servers: - http://localhost:8080/push + - https://secure.backend.example.com - group: group_2 servers: - http://test:8082 - http://test2:8181/test ``` +## Features + +- **Live Configuration Updates**: Modify `proxy.yaml` and changes are applied automatically without restart +- **Load Balancing**: Round-robin distribution across multiple backend servers +- **HTTPS Support**: Proxy to HTTPS backends with configurable SSL verification +- **Response Caching**: Caches responses based on Cache-Control headers +- **Request Forwarding**: Preserves and adds X-Forwarded-For headers + ## Build from source ### Install Rust ```bash diff --git a/cache/Cargo.toml b/cache/Cargo.toml index a406b2f..6801243 100644 --- a/cache/Cargo.toml +++ b/cache/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] -crossbeam = "0.8.0" +crossbeam = "0.8" anyhow = "1" -actix-web = "3" +actix-web = "4" blocking-delay-queue = { git = "https://github.com/dejankos/blocking-delay-queue" } \ No newline at end of file diff --git a/cache/src/expiring_cache.rs b/cache/src/expiring_cache.rs index e32ddc8..807150b 100644 --- a/cache/src/expiring_cache.rs +++ b/cache/src/expiring_cache.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; -use actix_web::http::{HeaderMap, StatusCode}; +use actix_web::http::{StatusCode, header::HeaderMap}; use actix_web::web::Bytes; use blocking_delay_queue::{BlockingDelayQueue, DelayItem}; use crossbeam::sync::{ShardedLock, ShardedLockWriteGuard}; @@ -75,7 +75,7 @@ impl ResponseCache { #[cfg(test)] mod tests { - use actix_web::http::{HeaderMap, StatusCode}; + use actix_web::http::{StatusCode, header::HeaderMap}; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; diff --git a/config/proxy.yaml b/config/proxy.yaml index 5727f74..5165b97 100644 --- a/config/proxy.yaml +++ b/config/proxy.yaml @@ -9,6 +9,12 @@ service: # dev mode - will enable only terminal logger dev_mode: true +# ssl configuration (optional) +ssl: + # verify SSL certificates for backend HTTPS connections + # set to false to allow self-signed certificates (not recommended for production) + verify_ssl: true + # inbound paths inbound: # match path to group diff --git a/core/Cargo.toml b/core/Cargo.toml index 15b64b4..eb964c4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -6,16 +6,17 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -simplelog = "0.7.6" -log = "0.4.8" +simplelog = "0.12" +log = "0.4" anyhow = "1" -crossbeam = "0.8.0" +crossbeam = "0.8" serde = { version = "1.0", features = ["derive"] } -serde_yaml = "0.8" -url = "2.2.1" -notify = "4.0.15" -regex = "1.4.4" -actix-web = { version = "3", features = ["openssl"] } +serde_yaml = "0.9" +url = "2" +notify = "6.1" +regex = "1" +actix-web = { version = "4", features = ["openssl"] } +awc = { version = "3", features = ["openssl"] } openssl = "0.10" cache = { path = "../cache" } \ No newline at end of file diff --git a/core/src/config.rs b/core/src/config.rs index 7e8c787..751ec06 100644 --- a/core/src/config.rs +++ b/core/src/config.rs @@ -1,4 +1,4 @@ -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -24,6 +24,16 @@ pub struct Service { pub dev_mode: bool, } +#[derive(Debug, Deserialize, Clone)] +pub struct SslConfig { + #[serde(default = "default_ssl_verify")] + pub verify_ssl: bool, +} + +fn default_ssl_verify() -> bool { + true +} + impl Default for Service { fn default() -> Self { Service { @@ -39,6 +49,8 @@ impl Default for Service { #[derive(Debug, Deserialize)] pub struct ProxyProperties { pub service: Service, + #[serde(default)] + pub ssl: Option, pub inbound: Vec, pub outbound: Vec, } @@ -73,18 +85,6 @@ pub struct Group { pub timeout: Duration, } -trait FileName { - fn file_name_to_str(&self) -> &str; -} - -impl FileName for &PathBuf { - fn file_name_to_str(&self) -> &str { - self.file_name() - .as_ref() - .and_then(|os_str| os_str.to_str()) - .unwrap_or("") - } -} impl Configuration { pub fn new

(path: P) -> Result @@ -119,6 +119,15 @@ impl Configuration { .clone() } + pub fn ssl_config(&self) -> Option { + self.proxy_config + .read() + .expect("proxy config read lock poisoned!") + .props + .ssl + .clone() + } + fn interested(&self, file_name: &str) -> bool { CONFIG_FILE == file_name } diff --git a/core/src/file_watcher.rs b/core/src/file_watcher.rs index 3dfdf88..7e529dd 100644 --- a/core/src/file_watcher.rs +++ b/core/src/file_watcher.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::Result; use crossbeam::sync::ShardedLock; use log::error; -use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; +use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; use crate::task::spawn; @@ -57,24 +57,35 @@ impl FileWatcher { fn run_event_loop(path: &Path, listeners: Listeners) -> Result<()> { let (tx, rx) = channel(); - let mut watcher = watcher(tx, Duration::from_secs(5))?; + + let mut watcher = RecommendedWatcher::new( + move |res: Result| { + if let Ok(event) = res { + let _ = tx.send(event); + } + }, + Config::default().with_poll_interval(Duration::from_secs(5)), + )?; + watcher.watch(path, RecursiveMode::NonRecursive)?; + // Keep watcher alive in the loop loop { match rx.recv() { - Ok(event) => match event { - DebouncedEvent::Write(ref p) => listeners - .read() - .expect("listener mutex poisoned!") - .iter() - .for_each(|l| l.notify_file_changed(p)), - DebouncedEvent::Error(e, o) => { - error!("Path {:?} watch error {}.", o, e); + Ok(event) => { + if matches!(event.kind, EventKind::Modify(_)) { + for path in event.paths { + listeners + .read() + .expect("listener mutex poisoned!") + .iter() + .for_each(|l| l.notify_file_changed(&path)); + } } - _ => {} - }, + } Err(e) => { error!("Error receiving file events - stopping file watch! {}", e); + drop(watcher); // Explicitly drop watcher before breaking break; } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 3a41eaa..701fde0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,7 +9,7 @@ mod yaml_utils; mod task; pub use self::balancer::Balancer; -pub use self::config::Configuration; +pub use self::config::{Configuration, SslConfig}; pub use self::file_watcher::FileWatcher; pub use self::log::init_logger; pub use self::proxy::Proxy; diff --git a/core/src/log.rs b/core/src/log.rs index 318ef2a..0d70914 100644 --- a/core/src/log.rs +++ b/core/src/log.rs @@ -1,5 +1,5 @@ use log::LevelFilter; -use simplelog::{ConfigBuilder, TermLogger, TerminalMode, ThreadLogMode, WriteLogger}; +use simplelog::{ColorChoice, ConfigBuilder, TermLogger, TerminalMode, ThreadLogMode, WriteLogger}; use std::fs::File; pub fn init_logger(log_path: Option, dev_mode: bool) { @@ -8,7 +8,7 @@ pub fn init_logger(log_path: Option, dev_mode: bool) { .build(); if log_path.is_none() || dev_mode { - TermLogger::init(LevelFilter::Info, cfg, TerminalMode::Mixed) + TermLogger::init(LevelFilter::Info, cfg, TerminalMode::Mixed, ColorChoice::Auto) .expect("Failed to init term logger"); } else { let log_file = diff --git a/core/src/proxy.rs b/core/src/proxy.rs index 0fffbc2..405e399 100644 --- a/core/src/proxy.rs +++ b/core/src/proxy.rs @@ -3,27 +3,29 @@ use std::sync::Arc; use std::time::{Duration, Instant}; -use actix_web::client::{Client, ClientRequest}; -use actix_web::dev::{HttpResponseBuilder, RequestHead}; +use actix_web::dev::RequestHead; +use actix_web::HttpResponseBuilder; use actix_web::http::{StatusCode, Uri}; use actix_web::web::Bytes; use actix_web::{HttpRequest, HttpResponse}; use anyhow::anyhow; use anyhow::Result; +use awc::{Client, ClientRequest, Connector}; use log::debug; +use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use url::Url; use cache::{CachedResponse, ResponseCache}; use crate::balancer::{Balancer, Instance}; +use crate::config::SslConfig; use crate::http_utils::{get_host, Cacheable, Headers, XFF_HEADER_NAME}; use crate::task::spawn; -// const HTTPS_SCHEME: &str = "https"; - pub struct Proxy { balancer: Balancer, res_cache: Arc, + ssl_config: Option, } trait ProxyHeaders { @@ -43,7 +45,7 @@ impl ProxyHeaders for ClientRequest { } xff_value.push_str(get_host(req_from).as_str()); - self.set_header(XFF_HEADER_NAME, xff_value) + self.insert_header((XFF_HEADER_NAME, xff_value)) } fn clear_headers(mut self) -> Self { @@ -53,11 +55,12 @@ impl ProxyHeaders for ClientRequest { } impl Proxy { - pub fn new(balancer: Balancer) -> Result { + pub fn new(balancer: Balancer, ssl_config: Option) -> Result { let res_cache = Arc::new(ResponseCache::with_capacity(10_000)); let proxy = Proxy { balancer, res_cache, + ssl_config, }; proxy.run_expire(); Ok(proxy) @@ -73,7 +76,7 @@ impl Proxy { } let instance = self.balancer.balance(&req).await?; - let (mut resp_builder, bytes) = Self::send(instance, req.head(), &req, body).await?; + let (mut resp_builder, bytes) = self.send(instance, req.head(), &req, body).await?; let res = resp_builder.body(bytes.clone()); if req_cacheable { self.cache_write(key, &res, bytes); @@ -119,6 +122,7 @@ impl Proxy { } async fn send( + &self, instance: Instance, req_head: &RequestHead, req: &HttpRequest, @@ -127,7 +131,7 @@ impl Proxy { let proxy_uri = Self::create_proxy_uri(instance.url, req.path(), req.query_string())?; debug!("proxying to {}", &proxy_uri); - let mut response = Self::create_http_client(instance.timeout) + let mut response = self.create_http_client(instance.timeout)? .request_from(proxy_uri, req_head) .append_proxy_headers(req) .clear_headers() @@ -162,28 +166,28 @@ impl Proxy { Ok(Uri::try_from(url.as_str())?) } - fn create_http_client(timeout: Duration) -> Client { - Client::builder().timeout(timeout).finish() + fn create_http_client(&self, timeout: Duration) -> Result { + let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls())?; + + // Configure SSL verification based on config + if let Some(ssl_config) = &self.ssl_config { + if !ssl_config.verify_ssl { + ssl_connector_builder.set_verify(SslVerifyMode::NONE); + debug!("SSL verification disabled"); + } + } + + let connector = Connector::new() + .openssl(ssl_connector_builder.build()) + .timeout(timeout); + + Ok(Client::builder() + .connector(connector) + .timeout(timeout) + .finish()) } fn build_cache_key(req: &HttpRequest) -> Arc { Arc::from(req.uri().to_string().as_str()) } - - // fn create_http_client(scheme: &str, timeout: Duration) -> Result { - // if scheme == HTTPS_SCHEME { - // let ssl_connector = SslConnector::builder(SslMethod::tls())?.build(); - // let connector = Connector::new() - // .ssl(ssl_connector) - // .timeout(timeout) - // .finish(); - // - // Ok(Client::builder() - // .connector(connector) - // .timeout(timeout) - // .finish()) - // } else { - // Ok(Client::builder().timeout(timeout).finish()) - // } - // } } diff --git a/project.md b/project.md new file mode 100644 index 0000000..32f102e --- /dev/null +++ b/project.md @@ -0,0 +1,110 @@ +# Roxy - Reverse Proxy + +## Overview +Roxy is a reverse proxy server written in Rust that provides load balancing, response caching, and live configuration updates without service restarts. The project is currently in a rewrite phase (WIP). + +## What It Does +- Routes incoming HTTP requests to backend server groups based on path patterns +- Distributes traffic across multiple backend servers using round-robin load balancing +- Supports both HTTP and HTTPS backend servers with configurable SSL verification +- Caches HTTP responses with configurable TTL to reduce backend load +- Monitors configuration file changes and applies updates in real-time +- Adds X-Forwarded-For headers for request tracing + +## Architecture + +### Module Structure +The project uses a Rust workspace with three crates: + +**server/** +- Entry point using Actix-web framework +- Binds to configured IP/port and spawns worker threads +- Delegates all requests to the proxy handler + +**core/** +- `proxy.rs`: Main proxy logic, handles request forwarding and caching +- `balancer.rs`: Round-robin load balancing across server groups +- `config.rs`: Configuration management with live reload support +- `file_watcher.rs`: Monitors proxy.yaml for changes using notify crate +- `matcher.rs`: Regex-based path matching to route requests to server groups +- `http_utils.rs`: HTTP header manipulation utilities +- `log.rs`: Logging initialization + +**cache/** +- `expiring_cache.rs`: Thread-safe response cache with TTL expiration +- Uses BlockingDelayQueue for efficient expiration management +- Capacity-limited to prevent memory exhaustion + +### Data Flow +1. Request arrives at server (server/main.rs:57) +2. Matcher finds server group based on path (core/matcher.rs:37) +3. Balancer selects backend server via round-robin (core/balancer.rs:33) +4. Proxy checks cache, forwards request if miss (core/proxy.rs:66) +5. Response cached if cacheable (Cache-Control header present) +6. Response returned to client + +### Key Design Patterns +- **Observer Pattern**: FileWatcher notifies Configuration on file changes +- **Strategy Pattern**: Pluggable balancing (currently only round-robin) +- **Sharded Locking**: Uses crossbeam ShardedLock for concurrent read access +- **Thread-Safe Caching**: Separate expiration thread prevents blocking + +## Configuration +YAML-based configuration with four sections: +- `service`: Proxy server settings (IP, port, workers) +- `ssl` (optional): SSL/TLS configuration for HTTPS backends +- `inbound`: Path patterns mapped to server groups +- `outbound`: Backend server groups with timeouts + +Example: +```yaml +service: + ip: localhost + port: 8080 + workers: 6 + +ssl: + verify_ssl: true # Set to false for self-signed certs + +inbound: + - path: /api/* + group: api_servers + +outbound: + - group: api_servers + timeout: 60 + servers: + - http://backend1:8080 + - https://backend2:8443 # HTTPS backends supported +``` + +## Potential Improvements + +### High Priority +1. ~~**Update Dependencies**~~: ✅ **COMPLETED** - Updated to Actix-web 4, notify 6, awc 3, clap 4, and all other dependencies +2. ~~**Enable HTTPS Support**~~: ✅ **COMPLETED** - Full HTTPS backend support with configurable SSL verification +3. **Health Checks**: Detect and remove unhealthy backends from rotation +4. **Better Error Handling**: Currently returns generic 500 errors +5. **Comprehensive Tests**: Minimal test coverage currently + +### Medium Priority +6. **Additional Balancing Strategies**: Least connections, weighted round-robin, IP hash +7. **Circuit Breaker**: Prevent cascading failures from unhealthy backends +8. **Request Retry Logic**: Automatic retry on backend failures +9. **Metrics & Monitoring**: Prometheus/Grafana integration +10. **Rate Limiting**: Per-client or per-path rate limits + +### Nice to Have +11. **Request/Response Transformation**: Header manipulation, path rewriting +12. **WebSocket Support**: Currently only HTTP +13. **Compression**: Gzip/Brotli support +14. **Access Logs**: Structured logging for requests +15. **Admin API**: Runtime configuration inspection without file edits +16. **Docker Support**: Containerization for easier deployment +17. **Graceful Shutdown**: Drain connections before stopping +18. **Connection Pooling**: Reuse backend connections for better performance + +## Technical Debt +- Error messages could be more descriptive +- Configuration validation is minimal +- No integration tests for end-to-end flows diff --git a/server/Cargo.toml b/server/Cargo.toml index fe91b01..4d4b6e3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -8,18 +8,19 @@ description = "Reverse proxy with support for live configuration updates, balanc exclude = [".gitignore"] [dependencies] -actix-web = { version = "3", features = ["openssl"] } +actix-web = { version = "4", features = ["openssl"] } +awc = { version = "3", features = ["openssl"] } openssl = "0.10" -simplelog = "0.7.6" -log = "0.4.8" +simplelog = "0.12" +log = "0.4" anyhow = "1" serde = { version = "1.0", features = ["derive"] } -serde_yaml = "0.8" -crossbeam = "0.8.0" -notify = "4.0.15" -regex = "1.4.4" -url = "2.2.1" -structopt = "0.3.21" +serde_yaml = "0.9" +crossbeam = "0.8" +notify = "6.1" +regex = "1" +url = "2" +clap = { version = "4", features = ["derive"] } blocking-delay-queue = { git = "https://github.com/dejankos/blocking-delay-queue" } cache = { path = "../cache" } core = { path = "../core" } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 9353289..12d4841 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -9,7 +9,7 @@ use anyhow::anyhow; use serde::Serialize; -use structopt::StructOpt; +use clap::Parser; use core::init_logger; use core::Balancer; @@ -19,9 +19,11 @@ use core::Proxy; type Response = Result; -#[derive(StructOpt, Debug)] +#[derive(Parser, Debug)] +#[command(name = "roxy")] +#[command(about = "Reverse proxy with live configuration updates")] pub struct CliCfg { - #[structopt( + #[arg( short, long, help = "Proxy configuration file", @@ -67,7 +69,7 @@ async fn main() -> anyhow::Result<()> { std::env::set_var("RUST_LOG", "actix_web=debug"); std::env::set_var("RUST_BACKTRACE", "1"); - let cli_cfg = CliCfg::from_args(); + let cli_cfg = CliCfg::parse(); let configuration = Arc::new(Configuration::new(&cli_cfg.proxy_config_path)?); let service_config = configuration.service_config(); init_logger(service_config.log_path, service_config.dev_mode); @@ -76,13 +78,14 @@ async fn main() -> anyhow::Result<()> { watcher.register_listener(Box::new(configuration.clone())); watcher.watch_file_changes()?; - let proxy = Proxy::new(Balancer::new(configuration.clone()))?; + let ssl_config = configuration.ssl_config(); + let proxy = Proxy::new(Balancer::new(configuration.clone()), ssl_config)?; let data = web::Data::new(proxy); HttpServer::new(move || { App::new() .wrap(Logger::default()) .app_data(data.clone()) - .service(web::resource("/*").to(proxy_request)) + .service(web::resource("/{tail:.*}").to(proxy_request)) }) .bind(format!("{}:{}", service_config.ip, service_config.port))? .workers(service_config.workers)