diff --git a/Cargo.lock b/Cargo.lock index 216b4aa0b..cc81f1336 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2918,6 +2918,7 @@ dependencies = [ "openssl", "predicates", "rand 0.8.5", + "redis", "regex", "reqsign 0.18.0", "reqwest", @@ -3810,9 +3811,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -3821,9 +3822,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -3832,9 +3833,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", ] diff --git a/Cargo.toml b/Cargo.toml index 8005fb895..676edcf75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,7 @@ nix = { version = "0.30.0", optional = true, features = [ "signal", "process", ] } +redis = { version = "0.32", optional = true } rouille = { version = "3.6", optional = true, default-features = false, features = [ "ssl", ] } @@ -176,7 +177,7 @@ gha = ["opendal/services-ghac", "reqwest"] memcached = ["opendal/services-memcached"] native-zlib = [] oss = ["opendal/services-oss", "reqsign", "reqwest"] -redis = ["url", "opendal/services-redis"] +redis = ["dep:redis", "url", "opendal/services-redis"] s3 = ["opendal/services-s3", "reqsign", "reqwest"] webdav = ["opendal/services-webdav", "reqwest"] # Enable features that will build a vendored version of openssl and diff --git a/src/cache/cache.rs b/src/cache/cache.rs index 906c69f5f..0ba09e459 100644 --- a/src/cache/cache.rs +++ b/src/cache/cache.rs @@ -377,16 +377,24 @@ pub fn build_single_cache( key_prefix, }) => { let storage = match (endpoint, cluster_endpoints, url) { - (Some(url), None, None) => { - debug!("Init redis single-node cache with url {url}"); - RedisCache::build_single( - url, - username.as_deref(), - password.as_deref(), - *db, - key_prefix, - *ttl, - ) + (Some(url_str), None, None) => { + if url_str.starts_with("redis-sentinel://") { + debug!("Init redis sentinel cache with url {url_str}"); + if username.is_some() || password.is_some() || *db != crate::config::DEFAULT_REDIS_DB { + warn!("`username`, `password` and `db` have no effect when using a `redis-sentinel://` URL. Embed credentials in the URL instead."); + } + RedisCache::build_sentinel(url_str, key_prefix, *ttl) + } else { + debug!("Init redis single-node cache with url {url_str}"); + RedisCache::build_single( + url_str, + username.as_deref(), + password.as_deref(), + *db, + key_prefix, + *ttl, + ) + } } (None, Some(urls), None) => { debug!("Init redis cluster cache with urls {urls}"); diff --git a/src/cache/redis.rs b/src/cache/redis.rs index b3f0ab53b..e375a7189 100644 --- a/src/cache/redis.rs +++ b/src/cache/redis.rs @@ -14,6 +14,7 @@ // limitations under the License. use crate::errors::*; +use anyhow::anyhow; use opendal::Operator; use opendal::layers::LoggingLayer; use opendal::services::Redis; @@ -53,6 +54,151 @@ impl RedisCache { Ok(op) } + /// Create a new `RedisCache` by discovering the master via Redis Sentinel. + /// + /// Parses a `redis-sentinel://` URL, queries each sentinel node for the + /// master address, then connects to the discovered master. + /// + /// # URL Format + /// + /// ```text + /// redis-sentinel://[:password@]host1[:port1][,host2[:port2],...]/master_name[/db] + /// ``` + /// + /// - Multiple sentinel nodes are comma-separated + /// - Password (if present) applies to the Redis master, not the sentinels + /// - The `db` segment is optional and defaults to 0 + pub fn build_sentinel(url: &str, key_prefix: &str, ttl: u64) -> Result { + use std::net::ToSocketAddrs; + + debug!("Building Redis Sentinel cache from URL: {}", url); + + let parsed = parse_sentinel_url(url)?; + + debug!( + "Sentinel nodes: {:?}, master_name: {}", + parsed.nodes, parsed.master_name + ); + + let nodes_raw = parsed.nodes; + let mut master_addr = None; + let mut last_error: Option = None; + + let master_name = &parsed.master_name; + + debug!( + "Attempting to discover master '{}' from {} sentinel node(s)", + master_name, + nodes_raw.len() + ); + + for node in &nodes_raw { + debug!("Trying sentinel node: {}", node); + + // Resolve hostname to IP address(es) + let resolved_addr = match node.to_socket_addrs() { + Ok(mut addrs) => { + if let Some(addr) = addrs.next() { + debug!("Resolved {} to {}", node, addr); + addr.to_string() + } else { + debug!("DNS resolved {} but returned no addresses", node); + node.to_string() + } + } + Err(e) => { + debug!( + "DNS resolution failed for {}: {}, using hostname directly", + node, e + ); + node.to_string() + } + }; + + let redis_url = format!("redis://{}", resolved_addr); + debug!("Connecting to sentinel at: {}", redis_url); + + match redis::Client::open(redis_url.as_str()) { + Ok(client) => match client.get_connection() { + Ok(mut conn) => { + let res: redis::RedisResult> = redis::cmd("SENTINEL") + .arg("get-master-addr-by-name") + .arg(master_name.as_str()) + .query(&mut conn); + + match res { + Ok(addr_parts) if addr_parts.len() >= 2 => { + let discovered = + format!("redis://{}:{}", addr_parts[0], addr_parts[1]); + debug!("Discovered master '{}' at: {}", master_name, discovered); + master_addr = Some(discovered); + break; + } + Ok(addr_parts) => { + let msg = format!( + "Sentinel returned incomplete response: {:?}", + addr_parts + ); + debug!("{}", msg); + last_error = Some(msg); + } + Err(e) => { + let msg = format!("Sentinel query failed: {}", e); + debug!("{}", msg); + last_error = Some(msg); + } + } + } + Err(e) => { + let msg = format!("Connection failed: {}", e); + debug!("{}", msg); + last_error = Some(msg); + } + }, + Err(e) => { + let msg = format!("Client creation failed: {}", e); + debug!("{}", msg); + last_error = Some(msg); + } + } + } + + let final_endpoint = match master_addr { + Some(addr) => addr, + None => { + let err_detail = last_error.unwrap_or_else(|| "no sentinels responded".to_string()); + return Err(anyhow!( + "Could not discover master '{}' from any sentinel. Last error: {}", + master_name, + err_detail + )); + } + }; + + debug!("Using Redis master endpoint: {}", final_endpoint); + + let mut builder = Redis::default().endpoint(&final_endpoint).root(key_prefix); + + if let Some(ref pass) = parsed.password { + builder = builder.password(pass); + } + + if ttl != 0 { + builder = builder.default_ttl(Duration::from_secs(ttl)); + } + + if let Some(db) = parsed.db { + builder = builder.db(db); + } + + let op = Operator::new(builder)? + .layer(LoggingLayer::default()) + .finish(); + + debug!("Redis Sentinel cache initialized successfully"); + Ok(op) + } + /// Create a new `RedisCache` for the given single instance. pub fn build_single( endpoint: &str, @@ -104,3 +250,134 @@ impl RedisCache { Ok(op) } } + +/// Parsed components of a `redis-sentinel://` URL. +#[derive(Debug, PartialEq)] +struct SentinelUrl { + /// Sentinel node addresses (host:port). + nodes: Vec, + /// Sentinel master name. + master_name: String, + /// Optional password for the Redis master. + password: Option, + /// Optional database number. + db: Option, +} + +/// Parse a `redis-sentinel://` URL into its components. +/// +/// Format: `redis-sentinel://[:password@]host1[:port1][,host2[:port2],...]/master_name[/db]` +fn parse_sentinel_url(url: &str) -> Result { + let clean_url = url.trim_start_matches("redis-sentinel://"); + let parts: Vec<&str> = clean_url.splitn(3, '/').collect(); + if parts.len() < 2 || parts[1].is_empty() { + return Err(anyhow!( + "Invalid sentinel URL format: expected redis-sentinel://host:port/master_name" + )); + } + + let nodes_part = parts[0]; + let master_name = parts[1].to_string(); + + // Handle password: rsplit_once so passwords containing '@' work correctly + let (password, nodes_str) = if let Some((cred_part, nodes)) = nodes_part.rsplit_once('@') { + let pass = cred_part.trim_start_matches(':'); + (Some(pass.to_string()), nodes) + } else { + (None, nodes_part) + }; + + let nodes: Vec = nodes_str.split(',').map(|s| s.to_string()).collect(); + if nodes.is_empty() || nodes.iter().all(|n| n.is_empty()) { + return Err(anyhow!("Invalid sentinel URL: no sentinel nodes specified")); + } + + let db = if parts.len() > 2 && !parts[2].is_empty() { + Some( + parts[2] + .parse::() + .map_err(|_| anyhow!("Invalid db number in sentinel URL: '{}'", parts[2]))?, + ) + } else { + None + }; + + Ok(SentinelUrl { + nodes, + master_name, + password, + db, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_simple_sentinel_url() { + let parsed = parse_sentinel_url("redis-sentinel://host1:26379/mymaster").unwrap(); + assert_eq!(parsed.nodes, vec!["host1:26379"]); + assert_eq!(parsed.master_name, "mymaster"); + assert_eq!(parsed.password, None); + assert_eq!(parsed.db, None); + } + + #[test] + fn parse_multiple_nodes() { + let parsed = + parse_sentinel_url("redis-sentinel://h1:26379,h2:26379,h3:26379/mymaster").unwrap(); + assert_eq!(parsed.nodes, vec!["h1:26379", "h2:26379", "h3:26379"]); + assert_eq!(parsed.master_name, "mymaster"); + } + + #[test] + fn parse_with_password() { + let parsed = + parse_sentinel_url("redis-sentinel://:secretpass@host1:26379/mymaster").unwrap(); + assert_eq!(parsed.password, Some("secretpass".to_string())); + assert_eq!(parsed.nodes, vec!["host1:26379"]); + } + + #[test] + fn parse_password_containing_at() { + let parsed = + parse_sentinel_url("redis-sentinel://:p@ss@word@host1:26379/mymaster").unwrap(); + assert_eq!(parsed.password, Some("p@ss@word".to_string())); + assert_eq!(parsed.nodes, vec!["host1:26379"]); + } + + #[test] + fn parse_with_db() { + let parsed = parse_sentinel_url("redis-sentinel://host1:26379/mymaster/3").unwrap(); + assert_eq!(parsed.db, Some(3)); + } + + #[test] + fn parse_full_url() { + let parsed = + parse_sentinel_url("redis-sentinel://:hunter2@s1:26379,s2:26380/prod-master/5") + .unwrap(); + assert_eq!(parsed.nodes, vec!["s1:26379", "s2:26380"]); + assert_eq!(parsed.master_name, "prod-master"); + assert_eq!(parsed.password, Some("hunter2".to_string())); + assert_eq!(parsed.db, Some(5)); + } + + #[test] + fn parse_missing_master_name() { + assert!(parse_sentinel_url("redis-sentinel://host1:26379").is_err()); + assert!(parse_sentinel_url("redis-sentinel://host1:26379/").is_err()); + } + + #[test] + fn parse_invalid_db() { + assert!(parse_sentinel_url("redis-sentinel://host:26379/master/notanumber").is_err()); + } + + #[test] + fn parse_no_port() { + let parsed = parse_sentinel_url("redis-sentinel://myhost/mymaster").unwrap(); + assert_eq!(parsed.nodes, vec!["myhost"]); + } +}