diff --git a/Cargo.lock b/Cargo.lock index e3f50d30..6bf26659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1318,6 +1318,7 @@ dependencies = [ "chrono", "ddm-api", "ddm-types", + "ddm-types-versions", "dpd-client", "dropshot 0.17.0", "hostname 0.4.2", @@ -1349,6 +1350,7 @@ dependencies = [ name = "ddm-admin-client" version = "0.1.0" dependencies = [ + "mg-common", "oxnet", "progenitor 0.13.0", "reqwest 0.13.2", @@ -3792,15 +3794,18 @@ dependencies = [ "clap", "libc", "libnet", + "omicron-common", "oximeter", "oximeter-producer", "oxnet", "schemars 0.8.22", "serde", + "serde_json", "slog", "slog-async", "slog-bunyan", "smf 0.10.0 (git+https://github.com/illumos/smf-rs?branch=main)", + "thiserror 2.0.18", "uuid", ] @@ -3876,6 +3881,7 @@ version = "0.1.0" dependencies = [ "bfd", "bgp", + "mg-common", "rdb", "schemars 0.8.22", "serde", diff --git a/ddm-admin-client/Cargo.toml b/ddm-admin-client/Cargo.toml index 24d50602..88e58187 100644 --- a/ddm-admin-client/Cargo.toml +++ b/ddm-admin-client/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" ignored = ["oxnet", "serde", "uuid"] [dependencies] +mg-common.workspace = true oxnet.workspace = true progenitor.workspace = true reqwest.workspace = true diff --git a/ddm-admin-client/src/lib.rs b/ddm-admin-client/src/lib.rs index ea45cea2..bea6d028 100644 --- a/ddm-admin-client/src/lib.rs +++ b/ddm-admin-client/src/lib.rs @@ -39,3 +39,51 @@ impl std::hash::Hash for types::TunnelOrigin { self.metric.hash(state); } } + +impl std::cmp::PartialEq for types::Vni { + fn eq(&self, other: &Self) -> bool { + self.0.eq(&other.0) + } +} + +impl std::cmp::Eq for types::Vni {} + +impl std::hash::Hash for types::Vni { + fn hash(&self, state: &mut H) { + self.0.hash(state); + } +} + +impl std::cmp::PartialEq for types::MulticastOrigin { + fn eq(&self, other: &Self) -> bool { + self.overlay_group.eq(&other.overlay_group) + && self.underlay_group.eq(&other.underlay_group) + && self.vni.eq(&other.vni) + && self.source.eq(&other.source) + } +} + +impl std::cmp::Eq for types::MulticastOrigin {} + +/// Metric is excluded from identity so that metric changes update +/// an existing entry rather than creating a duplicate. +impl std::hash::Hash for types::MulticastOrigin { + fn hash(&self, state: &mut H) { + self.overlay_group.hash(state); + self.underlay_group.hash(state); + self.vni.hash(state); + self.source.hash(state); + } +} + +impl From for types::MulticastOrigin { + fn from(o: mg_common::net::MulticastOrigin) -> Self { + Self { + overlay_group: o.overlay_group, + underlay_group: o.underlay_group.ip(), + vni: types::Vni(o.vni.as_u32()), + metric: o.metric, + source: o.source, + } + } +} diff --git a/ddm-api/src/lib.rs b/ddm-api/src/lib.rs index 546797dc..6f3f8282 100644 --- a/ddm-api/src/lib.rs +++ b/ddm-api/src/lib.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use ddm_types_versions::latest; +use ddm_types_versions::v1; use dropshot::HttpError; use dropshot::HttpResponseOk; use dropshot::HttpResponseUpdatedNoContent; @@ -10,7 +11,7 @@ use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; use dropshot_api_manager_types::api_versions; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use std::collections::{HashMap, HashSet}; @@ -26,6 +27,7 @@ api_versions!([ // | example for the next person. // v // (next_int, IDENT), + (2, MULTICAST_SUPPORT), (1, INITIAL), ]); @@ -45,11 +47,25 @@ api_versions!([ pub trait DdmAdminApi { type Context; - #[endpoint { method = GET, path = "/peers" }] + #[endpoint { + method = GET, + path = "/peers", + versions = VERSION_MULTICAST_SUPPORT.. + }] async fn get_peers( ctx: RequestContext, ) -> Result>, HttpError>; + /// Returns peers without interface name information. + #[endpoint { + method = GET, + path = "/peers", + versions = ..VERSION_MULTICAST_SUPPORT + }] + async fn get_peers_v1( + ctx: RequestContext, + ) -> Result>, HttpError>; + #[endpoint { method = DELETE, path = "/peers/{addr}" }] async fn expire_peer( ctx: RequestContext, @@ -100,6 +116,44 @@ pub trait DdmAdminApi { request: TypedBody>, ) -> Result; + #[endpoint { + method = GET, + path = "/originated_multicast_groups", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn get_originated_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError>; + + #[endpoint { + method = GET, + path = "/multicast_groups", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn get_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError>; + + #[endpoint { + method = PUT, + path = "/multicast_group", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn advertise_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result; + + #[endpoint { + method = DELETE, + path = "/multicast_group", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn withdraw_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result; + #[endpoint { method = PUT, path = "/sync" }] async fn sync( ctx: RequestContext, diff --git a/ddm-types/versions/src/latest.rs b/ddm-types/versions/src/latest.rs index 08057601..721f8cbf 100644 --- a/ddm-types/versions/src/latest.rs +++ b/ddm-types/versions/src/latest.rs @@ -11,13 +11,16 @@ pub mod admin { } pub mod db { - pub use crate::v1::db::PeerInfo; pub use crate::v1::db::PeerStatus; pub use crate::v1::db::RouterKind; pub use crate::v1::db::TunnelRoute; + pub use crate::v2::db::MulticastRoute; + pub use crate::v2::db::PeerInfo; } pub mod exchange { pub use crate::v1::exchange::PathVector; pub use crate::v1::exchange::PathVectorV2; + pub use crate::v2::exchange::MulticastPathHop; + pub use crate::v2::exchange::MulticastPathVector; } diff --git a/ddm-types/versions/src/lib.rs b/ddm-types/versions/src/lib.rs index 9f8dcdc7..f723aaf0 100644 --- a/ddm-types/versions/src/lib.rs +++ b/ddm-types/versions/src/lib.rs @@ -32,3 +32,5 @@ pub mod latest; #[path = "initial/mod.rs"] pub mod v1; +#[path = "multicast_support/mod.rs"] +pub mod v2; diff --git a/ddm-types/versions/src/multicast_support/db.rs b/ddm-types/versions/src/multicast_support/db.rs new file mode 100644 index 00000000..8fedfe64 --- /dev/null +++ b/ddm-types/versions/src/multicast_support/db.rs @@ -0,0 +1,89 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::net::Ipv6Addr; + +use mg_common::net::MulticastOrigin; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::v1::db::{PeerStatus, RouterKind}; +use crate::v2::exchange::MulticastPathHop; + +/// A multicast route learned via DDM. +/// +/// Carries a MulticastOrigin (overlay group + ff04::/64 underlay +/// mapping) and the path vector from the originating subscriber +/// through intermediate transit routers. +// The path enables loop detection and (in multi-rack topologies) +// replication optimizations (RFD 488) in the future. +// +// Equality and hashing consider only `origin` and `nexthop` so that +// a route update with a longer path replaces the existing entry in +// hash-based collections. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct MulticastRoute { + /// The multicast group origin information. + pub origin: MulticastOrigin, + + /// Underlay nexthop address (DDM peer that advertised this route). + /// Used to associate the route with a peer for expiration. + pub nexthop: Ipv6Addr, + + /// Path vector from the originating subscriber outward. + /// Each hop records the router that redistributed this + /// subscription announcement. Used for loop detection on pull + /// and for future replication optimization in multi-rack + /// topologies. + #[serde(default)] + pub path: Vec, +} + +impl PartialEq for MulticastRoute { + fn eq(&self, other: &Self) -> bool { + self.origin == other.origin && self.nexthop == other.nexthop + } +} + +impl Eq for MulticastRoute {} + +impl std::hash::Hash for MulticastRoute { + fn hash(&self, state: &mut H) { + self.origin.hash(state); + self.nexthop.hash(state); + } +} + +impl From for MulticastOrigin { + fn from(x: MulticastRoute) -> Self { + x.origin + } +} + +/// Peer information with an optional interface name. +/// +// Adds the `if_name` field to identify which underlay interface the peer +// was discovered on. +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] +pub struct PeerInfo { + pub status: PeerStatus, + pub addr: Ipv6Addr, + pub host: String, + pub kind: RouterKind, + /// Interface name the peer was discovered on (e.g., "tfportrear0_0"). + #[serde(default)] + pub if_name: Option, +} + +/// Downconvert v2 PeerInfo to v1 PeerInfo by dropping `if_name`. +impl From for crate::v1::db::PeerInfo { + fn from(p: PeerInfo) -> Self { + Self { + status: p.status, + addr: p.addr, + host: p.host, + kind: p.kind, + } + } +} diff --git a/ddm-types/versions/src/multicast_support/exchange.rs b/ddm-types/versions/src/multicast_support/exchange.rs new file mode 100644 index 00000000..ca0cb161 --- /dev/null +++ b/ddm-types/versions/src/multicast_support/exchange.rs @@ -0,0 +1,74 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::net::Ipv6Addr; + +/// A single hop in the multicast path, carrying metadata needed for +/// replication optimization. +// Unlike unicast paths which only need hostnames, multicast hops carry +// additional information for computing optimal replication points +// (RFD 488). +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, +)] +pub struct MulticastPathHop { + /// Router identifier (hostname). + pub router_id: String, + + /// The underlay address of this router (for replication targeting). + pub underlay_addr: Ipv6Addr, + + /// Number of downstream subscribers reachable via this hop. + /// Used for load-aware replication decisions in multi-rack + /// topologies. + #[serde(default)] + pub downstream_subscriber_count: u32, +} + +impl MulticastPathHop { + /// Create a hop with the given router identity and a zero subscriber + /// count. The count will be populated once transit routers track + /// downstream subscriber counts for load-aware replication (RFD 488). + pub fn new(router_id: String, underlay_addr: Ipv6Addr) -> Self { + Self { + router_id, + underlay_addr, + downstream_subscriber_count: 0, + } + } +} + +/// Multicast group subscription announcement propagating through DDM. +/// +/// Contains a MulticastOrigin (overlay group + ff04::/64 underlay +/// mapping) and the path from the original subscriber outward. +// Currently, this is used for loop detection: if our router_id appears in the +// path, the announcement has already traversed us and is dropped. The path +// structure also carries topology information for future replication +// optimizations (RFD 488). +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, +)] +pub struct MulticastPathVector { + /// The multicast group origin information. + pub origin: mg_common::net::MulticastOrigin, + + /// The path from the original subscriber to the current router. + /// Ordered from subscriber outward (subscriber router first). + pub path: Vec, +} + +impl MulticastPathVector { + /// Append a hop to this path vector. + pub fn with_hop(&self, hop: MulticastPathHop) -> Self { + let mut path = self.path.clone(); + path.push(hop); + Self { + origin: self.origin.clone(), + path, + } + } +} diff --git a/ddm-types/versions/src/multicast_support/mod.rs b/ddm-types/versions/src/multicast_support/mod.rs new file mode 100644 index 00000000..066113e2 --- /dev/null +++ b/ddm-types/versions/src/multicast_support/mod.rs @@ -0,0 +1,9 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Types from API version 2 (MULTICAST_SUPPORT) that add multicast +//! group management to the DDM admin API. + +pub mod db; +pub mod exchange; diff --git a/ddm/Cargo.toml b/ddm/Cargo.toml index f505632d..7856f5a7 100644 --- a/ddm/Cargo.toml +++ b/ddm/Cargo.toml @@ -35,3 +35,4 @@ oxnet.workspace = true uuid.workspace = true ddm-api.workspace = true ddm-types.workspace = true +ddm-types-versions.workspace = true diff --git a/ddm/src/admin.rs b/ddm/src/admin.rs index 6d49a368..26fe15e3 100644 --- a/ddm/src/admin.rs +++ b/ddm/src/admin.rs @@ -7,7 +7,7 @@ use crate::sm::{AdminEvent, Event, PrefixSet, SmContext}; use ddm_api::DdmAdminApi; use ddm_api::ddm_admin_api_mod; use ddm_types::admin::{EnableStatsRequest, ExpirePathParams, PrefixMap}; -use ddm_types::db::{PeerInfo, TunnelRoute}; +use ddm_types::db::{MulticastRoute, PeerInfo, TunnelRoute}; use ddm_types::exchange::PathVector; use dropshot::ApiDescription; use dropshot::ApiDescriptionBuildErrors; @@ -21,7 +21,7 @@ use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; use mg_common::lock; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use slog::{Logger, error, info}; use std::collections::{HashMap, HashSet}; @@ -116,6 +116,22 @@ impl DdmAdminApi for DdmAdminApiImpl { Ok(HttpResponseOk(ctx.db.peers())) } + async fn get_peers_v1( + ctx: RequestContext, + ) -> Result< + HttpResponseOk>, + HttpError, + > { + let ctx = lock!(ctx.context()); + let peers = ctx + .db + .peers() + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(); + Ok(HttpResponseOk(peers)) + } + async fn expire_peer( ctx: RequestContext, params: Path, @@ -333,6 +349,71 @@ impl DdmAdminApi for DdmAdminApiImpl { Ok(HttpResponseUpdatedNoContent()) } + async fn get_originated_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError> { + let ctx = lock!(ctx.context()); + let originated = ctx + .db + .originated_mcast() + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + Ok(HttpResponseOk(originated)) + } + + async fn get_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError> { + let ctx = lock!(ctx.context()); + let imported = ctx.db.imported_mcast(); + Ok(HttpResponseOk(imported)) + } + + async fn advertise_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result { + let ctx = lock!(ctx.context()); + let groups = request.into_inner(); + slog::info!(ctx.log, "advertise multicast groups: {groups:#?}"); + ctx.db + .originate_mcast(&groups) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + + for e in &ctx.event_channels { + e.send(Event::Admin(AdminEvent::Announce(PrefixSet::Multicast( + groups.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; + } + + Ok(HttpResponseUpdatedNoContent()) + } + + async fn withdraw_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result { + let ctx = lock!(ctx.context()); + let groups = request.into_inner(); + slog::info!(ctx.log, "withdraw multicast groups: {groups:#?}"); + ctx.db + .withdraw_mcast(&groups) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + + for e in &ctx.event_channels { + e.send(Event::Admin(AdminEvent::Withdraw(PrefixSet::Multicast( + groups.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; + } + + Ok(HttpResponseUpdatedNoContent()) + } + async fn sync( ctx: RequestContext, ) -> Result { diff --git a/ddm/src/db.rs b/ddm/src/db.rs index 13338cc4..30eec234 100644 --- a/ddm/src/db.rs +++ b/ddm/src/db.rs @@ -2,9 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use ddm_types::db::{PeerInfo, TunnelRoute}; +use ddm_types::db::{MulticastRoute, PeerInfo, TunnelRoute}; use mg_common::lock; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::{IpNet, Ipv6Net}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -21,6 +21,10 @@ const ORIGINATE: &str = "originate"; /// tunnel endpoints. const TUNNEL_ORIGINATE: &str = "tunnel_originate"; +/// The handle used to open a persistent key-value tree for originated +/// multicast groups. +const MCAST_ORIGINATE: &str = "mcast_originate"; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("datastore error {0}")] @@ -48,6 +52,7 @@ pub struct DbData { pub peers: HashMap, pub imported: HashSet, pub imported_tunnel: HashSet, + pub imported_mcast: HashSet, } unsafe impl Sync for Db {} @@ -85,6 +90,14 @@ impl Db { lock!(self.data).imported_tunnel.len() } + pub fn imported_mcast(&self) -> HashSet { + lock!(self.data).imported_mcast.clone() + } + + pub fn imported_mcast_count(&self) -> usize { + lock!(self.data).imported_mcast.len() + } + pub fn import(&self, r: &HashSet) { lock!(self.data).imported.extend(r.clone()); } @@ -93,6 +106,10 @@ impl Db { lock!(self.data).imported_tunnel.extend(r.clone()); } + pub fn import_mcast(&self, r: &HashSet) { + lock!(self.data).imported_mcast.extend(r.clone()); + } + pub fn delete_import(&self, r: &HashSet) { let imported = &mut lock!(self.data).imported; for x in r { @@ -107,6 +124,38 @@ impl Db { } } + pub fn delete_import_mcast(&self, r: &HashSet) { + let imported = &mut lock!(self.data).imported_mcast; + for x in r { + imported.remove(x); + } + } + + /// Atomically import and delete multicast routes under a single lock, + /// returning the effective difference (additions + removals) against the + /// state before mutation. + /// + /// This avoids a TOCTOU race where concurrent mutations between separate + /// lock acquisitions could produce an incorrect view difference. + pub fn update_imported_mcast( + &self, + import: &HashSet, + remove: &HashSet, + ) -> (HashSet, HashSet) { + let mut data = lock!(self.data); + + let before = data.imported_mcast.clone(); + data.imported_mcast.extend(import.iter().cloned()); + + for x in remove { + data.imported_mcast.remove(x); + } + + let to_add = data.imported_mcast.difference(&before).cloned().collect(); + let to_del = before.difference(&data.imported_mcast).cloned().collect(); + (to_add, to_del) + } + pub fn originate(&self, prefixes: &HashSet) -> Result<(), Error> { let tree = self.persistent_data.open_tree(ORIGINATE)?; for p in prefixes { @@ -129,6 +178,19 @@ impl Db { Ok(()) } + pub fn originate_mcast( + &self, + origins: &HashSet, + ) -> Result<(), Error> { + let tree = self.persistent_data.open_tree(MCAST_ORIGINATE)?; + for o in origins { + let entry = serde_json::to_string(o)?; + tree.insert(entry.as_str(), "")?; + } + tree.flush()?; + Ok(()) + } + pub fn originated(&self) -> Result, Error> { let tree = self.persistent_data.open_tree(ORIGINATE)?; let result = tree @@ -178,6 +240,7 @@ impl Db { return None; } }; + let value = String::from_utf8_lossy(&key); let value: TunnelOrigin = match serde_json::from_str(&value) { Ok(item) => item, @@ -199,6 +262,44 @@ impl Db { Ok(self.originated_tunnel()?.len()) } + pub fn originated_mcast(&self) -> Result, Error> { + let tree = self.persistent_data.open_tree(MCAST_ORIGINATE)?; + let result = tree + .scan_prefix(vec![]) + .filter_map(|item| { + let (key, _value) = match item { + Ok(item) => item, + Err(e) => { + error!( + self.log, + "db: error fetching ddm mcast origin entry: {e}" + ); + return None; + } + }; + + let value = String::from_utf8_lossy(&key); + let value: MulticastOrigin = match serde_json::from_str(&value) + { + Ok(item) => item, + Err(e) => { + error!( + self.log, + "db: error parsing ddm mcast origin: {e}" + ); + return None; + } + }; + Some(value) + }) + .collect(); + Ok(result) + } + + pub fn originated_mcast_count(&self) -> Result { + Ok(self.originated_mcast()?.len()) + } + pub fn withdraw(&self, prefixes: &HashSet) -> Result<(), Error> { let tree = self.persistent_data.open_tree(ORIGINATE)?; for p in prefixes { @@ -221,6 +322,19 @@ impl Db { Ok(()) } + pub fn withdraw_mcast( + &self, + origins: &HashSet, + ) -> Result<(), Error> { + let tree = self.persistent_data.open_tree(MCAST_ORIGINATE)?; + for o in origins { + let entry = serde_json::to_string(o)?; + tree.remove(entry.as_str())?; + } + tree.flush()?; + Ok(()) + } + /// Set peer info at the given index. Returns true if peer information was /// changed. pub fn set_peer(&self, index: u32, info: PeerInfo) -> bool { @@ -233,7 +347,11 @@ impl Db { pub fn remove_nexthop_routes( &self, nexthop: Ipv6Addr, - ) -> (HashSet, HashSet) { + ) -> ( + HashSet, + HashSet, + HashSet, + ) { let mut data = lock!(self.data); // Routes are generally held in sets to prevent duplication and provide // handy set-algebra operations. @@ -256,7 +374,18 @@ impl Db { for x in &tnl_removed { data.imported_tunnel.remove(x); } - (removed, tnl_removed) + + let mut mcast_removed = HashSet::new(); + for x in &data.imported_mcast { + if x.nexthop == nexthop { + mcast_removed.insert(x.clone()); + } + } + for x in &mcast_removed { + data.imported_mcast.remove(x); + } + + (removed, tnl_removed, mcast_removed) } pub fn remove_peer(&self, index: u32) { diff --git a/ddm/src/discovery.rs b/ddm/src/discovery.rs index fc4a84e8..2c94a08f 100644 --- a/ddm/src/discovery.rs +++ b/ddm/src/discovery.rs @@ -113,6 +113,7 @@ const ADVERTISE: u8 = 1 << 1; pub enum Version { V2 = 2, V3 = 3, + V4 = 4, } #[derive(Error, Debug)] @@ -136,7 +137,7 @@ pub struct DiscoveryPacket { impl DiscoveryPacket { pub fn new_solicitation(hostname: String, kind: RouterKind) -> Self { Self { - version: Version::V2 as u8, + version: Version::V4 as u8, flags: SOLICIT, hostname, kind, @@ -144,7 +145,7 @@ impl DiscoveryPacket { } pub fn new_advertisement(hostname: String, kind: RouterKind) -> Self { Self { - version: Version::V2 as u8, + version: Version::V4 as u8, flags: ADVERTISE, hostname, kind, @@ -461,12 +462,12 @@ fn handle_advertisement( let version = match version { 2 => Version::V2, 3 => Version::V3, + 4 => Version::V4, x => { err!( ctx.log, ctx.config.if_name, - "unknown protocol version {}, known versions are: 1, 2", - x + "unknown protocol version {x}, known versions are: 2, 3, 4" ); return; } @@ -526,6 +527,7 @@ fn handle_advertisement( addr: *sender, host: hostname, kind, + if_name: Some(ctx.config.if_name.clone()), }, ); if updated { diff --git a/ddm/src/exchange.rs b/ddm/src/exchange.rs index 2c1cc876..9e696e37 100644 --- a/ddm/src/exchange.rs +++ b/ddm/src/exchange.rs @@ -19,8 +19,10 @@ use crate::db::{Route, effective_route_set}; use crate::discovery::Version; use crate::sm::{Config, Event, PeerEvent, SmContext}; use crate::{dbg, err, inf, wrn}; -use ddm_types::db::{RouterKind, TunnelRoute}; -use ddm_types::exchange::{PathVector, PathVectorV2}; +use ddm_types::db::{MulticastRoute, RouterKind, TunnelRoute}; +use ddm_types::exchange::{ + MulticastPathHop, MulticastPathVector, PathVector, PathVectorV2, +}; use dropshot::ApiDescription; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; @@ -74,10 +76,20 @@ pub struct UpdateV2 { pub tunnel: Option, } +/// THIS TYPE IS FOR DDM PROTOCOL VERSION 3. IT SHALL NEVER CHANGE. THIS TYPE +/// CAN BE REMOVED WHEN DDMV3 CLIENTS AND SERVERS NO LONGER EXIST BUT ITS +/// DEFINITION SHALL NEVER CHANGE. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct UpdateV3 { + pub underlay: Option, + pub tunnel: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] pub struct Update { pub underlay: Option, pub tunnel: Option, + pub multicast: Option, } impl From for Update { @@ -88,6 +100,7 @@ impl From for Update { announce: value.announce, withdraw: value.withdraw, }), + multicast: None, } } } @@ -97,6 +110,8 @@ impl From for Update { Update { tunnel: value.tunnel.map(TunnelUpdate::from), underlay: value.underlay.map(UnderlayUpdate::from), + // V2 protocol doesn't support multicast + multicast: None, } } } @@ -120,11 +135,31 @@ impl From for UpdateV2 { } } +impl From for Update { + fn from(value: UpdateV3) -> Self { + Update { + underlay: value.underlay, + tunnel: value.tunnel, + multicast: None, + } + } +} + +impl From for UpdateV3 { + fn from(value: Update) -> Self { + UpdateV3 { + underlay: value.underlay, + tunnel: value.tunnel, + } + } +} + impl From for Update { fn from(u: UnderlayUpdate) -> Self { Update { underlay: Some(u), tunnel: None, + multicast: None, } } } @@ -134,6 +169,7 @@ impl From for Update { Update { underlay: None, tunnel: Some(t), + multicast: None, } } } @@ -143,14 +179,35 @@ impl Update { Self { underlay: pr.underlay.map(UnderlayUpdate::announce), tunnel: pr.tunnel.map(TunnelUpdate::announce), + multicast: pr.multicast.map(MulticastUpdate::announce), } } } +impl From for Update { + fn from(m: MulticastUpdate) -> Self { + Update { + underlay: None, + tunnel: None, + multicast: Some(m), + } + } +} + +/// THIS TYPE IS FOR DDM PROTOCOL VERSION 3. IT SHALL NEVER CHANGE. THIS TYPE +/// CAN BE REMOVED WHEN DDMV3 CLIENTS AND SERVERS NO LONGER EXIST BUT ITS +/// DEFINITION SHALL NEVER CHANGE. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct PullResponseV3 { + pub underlay: Option>, + pub tunnel: Option>, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] pub struct PullResponse { pub underlay: Option>, pub tunnel: Option>, + pub multicast: Option>, } /// THIS TYPE IS FOR DDM PROTOCOL VERSION 2. IT SHALL NEVER CHANGE. THIS TYPE @@ -171,6 +228,18 @@ impl From for PullResponse { tunnel: value .tunnel .map(|x| x.into_iter().map(TunnelOrigin::from).collect()), + // V2 protocol doesn't support multicast + multicast: None, + } + } +} + +impl From for PullResponse { + fn from(value: PullResponseV3) -> Self { + PullResponse { + underlay: value.underlay, + tunnel: value.tunnel, + multicast: None, } } } @@ -180,6 +249,7 @@ impl From> for PullResponse { PullResponse { underlay: Some(value), tunnel: None, + multicast: None, } } } @@ -334,6 +404,50 @@ impl TunnelUpdate { } } +/// Multicast group subscription updates. +/// +/// Each entry carries a [`MulticastPathVector`] containing a +/// [`MulticastOrigin`] (overlay group + ff04::/64 underlay mapping) +/// and the path vector for loop detection. +/// +/// [`MulticastOrigin`]: mg_common::net::MulticastOrigin +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct MulticastUpdate { + pub announce: HashSet, + pub withdraw: HashSet, +} + +impl MulticastUpdate { + pub fn announce(groups: HashSet) -> Self { + Self { + announce: groups, + ..Default::default() + } + } + pub fn withdraw(groups: HashSet) -> Self { + Self { + withdraw: groups, + ..Default::default() + } + } + + /// Add a hop to all path vectors in this update. + pub fn with_hop(&self, hop: MulticastPathHop) -> Self { + Self { + announce: self + .announce + .iter() + .map(|pv| pv.with_hop(hop.clone())) + .collect(), + withdraw: self + .withdraw + .iter() + .map(|pv| pv.with_hop(hop.clone())) + .collect(), + } + } +} + #[derive(Error, Debug)] pub enum ExchangeError { #[error("io error: {0}")] @@ -404,15 +518,52 @@ pub(crate) fn withdraw_tunnel( send_update(ctx, config, update.into(), addr, version, rt, log) } -pub(crate) fn do_pull( +pub(crate) fn announce_multicast( + ctx: &SmContext, + config: Config, + groups: HashSet, + addr: Ipv6Addr, + version: Version, + rt: Arc, + log: Logger, +) -> Result<(), ExchangeError> { + let update = MulticastUpdate::announce(groups); + send_update(ctx, config, update.into(), addr, version, rt, log) +} + +pub(crate) fn withdraw_multicast( + ctx: &SmContext, + config: Config, + groups: HashSet, + addr: Ipv6Addr, + version: Version, + rt: Arc, + log: Logger, +) -> Result<(), ExchangeError> { + let update = MulticastUpdate::withdraw(groups); + send_update(ctx, config, update.into(), addr, version, rt, log) +} + +pub(crate) fn do_pull_v4( ctx: &SmContext, addr: &Ipv6Addr, rt: &Arc, ) -> Result { - let uri = format!( - "http://[{}%{}]:{}/v3/pull", - addr, ctx.config.if_index, ctx.config.exchange_port, - ); + let if_index = ctx.config.if_index; + let port = ctx.config.exchange_port; + let uri = format!("http://[{addr}%{if_index}]:{port}/v4/pull"); + let body = do_pull_common(uri, rt)?; + Ok(serde_json::from_slice(&body)?) +} + +pub(crate) fn do_pull_v3( + ctx: &SmContext, + addr: &Ipv6Addr, + rt: &Arc, +) -> Result { + let if_index = ctx.config.if_index; + let port = ctx.config.exchange_port; + let uri = format!("http://[{addr}%{if_index}]:{port}/v3/pull"); let body = do_pull_common(uri, rt)?; Ok(serde_json::from_slice(&body)?) } @@ -464,7 +615,8 @@ pub(crate) fn pull( ) -> Result<(), ExchangeError> { let pr: PullResponse = match version { Version::V2 => do_pull_v2(&ctx, &addr, &rt)?.into(), - Version::V3 => do_pull(&ctx, &addr, &rt)?, + Version::V3 => do_pull_v3(&ctx, &addr, &rt)?.into(), + Version::V4 => do_pull_v4(&ctx, &addr, &rt)?, }; let update = Update::announce(pr); @@ -489,43 +641,14 @@ fn send_update( log: Logger, ) -> Result<(), ExchangeError> { ctx.stats.updates_sent.fetch_add(1, Ordering::Relaxed); - match version { - Version::V2 => { - send_update_v2(ctx, config, update.into(), addr, rt, log) - } - Version::V3 => send_update_v3(ctx, config, update, addr, rt, log), - } -} - -fn send_update_v2( - ctx: &SmContext, - config: Config, - update: UpdateV2, - addr: Ipv6Addr, - rt: Arc, - log: Logger, -) -> Result<(), ExchangeError> { - let payload = serde_json::to_string(&update)?; - let uri = format!( - "http://[{}%{}]:{}/v2/push", - addr, config.if_index, config.exchange_port, - ); - send_update_common(ctx, uri, payload, config, rt, log) -} - -fn send_update_v3( - ctx: &SmContext, - config: Config, - update: Update, - addr: Ipv6Addr, - rt: Arc, - log: Logger, -) -> Result<(), ExchangeError> { - let payload = serde_json::to_string(&update)?; - let uri = format!( - "http://[{}%{}]:{}/v3/push", - addr, config.if_index, config.exchange_port, - ); + let (payload, path) = match version { + Version::V2 => (serde_json::to_string(&UpdateV2::from(update))?, "v2"), + Version::V3 => (serde_json::to_string(&UpdateV3::from(update))?, "v3"), + Version::V4 => (serde_json::to_string(&update)?, "v4"), + }; + let if_index = config.if_index; + let port = config.exchange_port; + let uri = format!("http://[{addr}%{if_index}]:{port}/{path}/push"); send_update_common(ctx, uri, payload, config, rt, log) } @@ -630,9 +753,11 @@ pub fn api_description() -> Result< > { let mut api = ApiDescription::new(); api.register(push_handler_v2)?; - api.register(push_handler)?; + api.register(push_handler_v3)?; + api.register(push_handler_v4)?; api.register(pull_handler_v2)?; - api.register(pull_handler)?; + api.register(pull_handler_v3)?; + api.register(pull_handler_v4)?; Ok(api) } @@ -647,7 +772,16 @@ async fn push_handler_v2( } #[endpoint { method = PUT, path = "/v3/push" }] -async fn push_handler( +async fn push_handler_v3( + ctx: RequestContext>>, + request: TypedBody, +) -> Result { + let update = Update::from(request.into_inner()); + push_handler_common(ctx, update).await +} + +#[endpoint { method = PUT, path = "/v4/push" }] +async fn push_handler_v4( ctx: RequestContext>>, request: TypedBody, ) -> Result { @@ -744,19 +878,15 @@ async fn pull_handler_v2( })) } -#[endpoint { method = GET, path = "/v3/pull" }] -async fn pull_handler( - ctx: RequestContext>>, -) -> Result, HttpError> { - let ctx = ctx.context().lock().await.clone(); - +/// Collect underlay and tunnel routes for pull responses (shared by V3/V4). +fn collect_underlay_tunnel( + ctx: &HandlerContext, +) -> Result<(HashSet, HashSet), HttpError> { let mut underlay = HashSet::new(); let mut tunnel = HashSet::new(); - // Only transit routers redistribute prefixes if ctx.ctx.config.kind == RouterKind::Transit { for route in &ctx.ctx.db.imported() { - // don't redistribute prefixes to their originators if route.nexthop == ctx.peer { continue; } @@ -771,21 +901,20 @@ async fn pull_handler( if route.nexthop == ctx.peer { continue; } - let tv = route.origin; - tunnel.insert(tv); + tunnel.insert(route.origin); } } + let originated = ctx .ctx .db .originated() .map_err(|e| HttpError::for_internal_error(e.to_string()))?; for prefix in &originated { - let pv = PathVector { + underlay.insert(PathVector { destination: *prefix, path: vec![ctx.ctx.hostname.clone()], - }; - underlay.insert(pv); + }); } let originated_tunnel = ctx @@ -794,26 +923,83 @@ async fn pull_handler( .originated_tunnel() .map_err(|e| HttpError::for_internal_error(e.to_string()))?; for prefix in &originated_tunnel { - let tv = TunnelOrigin { + tunnel.insert(TunnelOrigin { overlay_prefix: prefix.overlay_prefix, boundary_addr: prefix.boundary_addr, vni: prefix.vni, metric: prefix.metric, - }; - tunnel.insert(tv); + }); } + Ok((underlay, tunnel)) +} + +/// Collect multicast routes for V4 pull responses. +fn collect_multicast( + ctx: &HandlerContext, +) -> Result, HttpError> { + let mut multicast = HashSet::new(); + + if ctx.ctx.config.kind == RouterKind::Transit { + for route in &ctx.ctx.db.imported_mcast() { + if route.nexthop == ctx.peer { + continue; + } + let hop = MulticastPathHop::new( + ctx.ctx.hostname.clone(), + ctx.ctx.config.addr, + ); + let mut path = route.path.clone(); + path.push(hop); + multicast.insert(MulticastPathVector { + origin: route.origin.clone(), + path, + }); + } + } + + let originated_mcast = ctx + .ctx + .db + .originated_mcast() + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + for origin in &originated_mcast { + let hop = MulticastPathHop::new( + ctx.ctx.hostname.clone(), + ctx.ctx.config.addr, + ); + multicast.insert(MulticastPathVector { + origin: origin.clone(), + path: vec![hop], + }); + } + + Ok(multicast) +} + +#[endpoint { method = GET, path = "/v3/pull" }] +async fn pull_handler_v3( + ctx: RequestContext>>, +) -> Result, HttpError> { + let ctx = ctx.context().lock().await.clone(); + let (underlay, tunnel) = collect_underlay_tunnel(&ctx)?; + Ok(HttpResponseOk(PullResponseV3 { + underlay: crate::non_empty(underlay), + tunnel: crate::non_empty(tunnel), + })) +} + +#[endpoint { method = GET, path = "/v4/pull" }] +async fn pull_handler_v4( + ctx: RequestContext>>, +) -> Result, HttpError> { + let ctx = ctx.context().lock().await.clone(); + let (underlay, tunnel) = collect_underlay_tunnel(&ctx)?; + let multicast = collect_multicast(&ctx)?; Ok(HttpResponseOk(PullResponse { - underlay: if underlay.is_empty() { - None - } else { - Some(underlay) - }, - tunnel: if tunnel.is_empty() { - None - } else { - Some(tunnel) - }, + underlay: crate::non_empty(underlay), + tunnel: crate::non_empty(tunnel), + multicast: crate::non_empty(multicast), })) } @@ -831,6 +1017,10 @@ fn handle_update(update: &Update, ctx: &HandlerContext) { handle_tunnel_update(tunnel_update, ctx); } + if let Some(multicast_update) = &update.multicast { + handle_multicast_update(multicast_update, ctx); + } + // distribute updates if ctx.ctx.config.kind == RouterKind::Transit { @@ -846,13 +1036,24 @@ fn handle_update(update: &Update, ctx: &HandlerContext) { .as_ref() .map(|update| update.with_path_element(ctx.ctx.hostname.clone())); - let push = Update { + // Add our hop info to multicast path vectors before redistribution + let multicast = update.multicast.as_ref().map(|update| { + let hop = MulticastPathHop::new( + ctx.ctx.hostname.clone(), + ctx.ctx.config.addr, + ); + update.with_hop(hop) + }); + + let push = Arc::new(Update { underlay, tunnel: update.tunnel.clone(), - }; + multicast, + }); for ec in &ctx.ctx.event_channels { - ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); + ec.send(Event::Peer(PeerEvent::Push(Arc::clone(&push)))) + .unwrap(); } } } @@ -999,3 +1200,205 @@ fn handle_underlay_update(update: &UnderlayUpdate, ctx: &HandlerContext) { .imported_underlay_prefixes .store(ctx.ctx.db.imported_count() as u64, Ordering::Relaxed); } + +/// Handle multicast group subscription updates from a peer. +/// +/// Validation uses path-vector-based RPF rather than unicast-RIB RPF. +/// DDM operates on the underlay while multicast sources are overlay +/// addresses, so traditional (S,G) RPF against the unicast RIB does not +/// apply at this layer. The MRIB RPF module in rdb handles that check +/// before routes are originated into DDM. At the DDM exchange level, +/// the path vector provides loop detection and carries topology +/// information for replication optimization (RFD 488). +fn handle_multicast_update(update: &MulticastUpdate, ctx: &HandlerContext) { + let db = &ctx.ctx.db; + let hostname = &ctx.ctx.hostname; + + let mut import = HashSet::new(); + for pv in &update.announce { + // Path-vector RPF: drop if our router_id appears in the path, + // indicating the announcement has already traversed us. + if pv.path.iter().any(|hop| &hop.router_id == hostname) { + dbg!( + ctx.log, + ctx.ctx.config.if_name, + "dropping multicast announce for {:?} - loop detected \ + (path length {})", + pv.origin.overlay_group, + pv.path.len(), + ); + continue; + } + + import.insert(MulticastRoute { + origin: pv.origin.clone(), + nexthop: ctx.peer, + path: pv.path.clone(), + }); + } + + let mut remove = HashSet::new(); + for pv in &update.withdraw { + // Empty path is safe: MulticastRoute's PartialEq/Hash exclude + // the path field, so this matches by (origin, nexthop) only. + remove.insert(MulticastRoute { + origin: pv.origin.clone(), + nexthop: ctx.peer, + path: Vec::new(), + }); + } + + // Atomic import + delete + diff under a single lock. + // + // DDM stores learned multicast state, which feeds back into Omicron, as + // the latter owns OPTE M2P programming via sled-agent (the M2P table is + // global to xde). + // Learned state is queryable via the DDM admin API (get_multicast_groups). + db.update_imported_mcast(&import, &remove); +} + +#[cfg(test)] +mod test { + use super::*; + use ddm_types::exchange::MulticastPathHop; + use mg_common::net::{MulticastOrigin, UnderlayMulticastIpv6, Vni}; + use std::net::Ipv6Addr; + + fn sample_multicast_update() -> MulticastUpdate { + let origin = MulticastOrigin { + overlay_group: "233.252.0.1".parse().unwrap(), + underlay_group: UnderlayMulticastIpv6::new( + "ff04::1".parse().unwrap(), + ) + .unwrap(), + vni: Vni::try_from(77u32).unwrap(), + metric: 0, + source: None, + }; + let pv = MulticastPathVector { + origin, + path: vec![MulticastPathHop::new( + "router-1".into(), + Ipv6Addr::LOCALHOST, + )], + }; + MulticastUpdate::announce([pv].into_iter().collect()) + } + + #[test] + fn v4_update_round_trips() { + let update = Update { + underlay: None, + tunnel: None, + multicast: Some(sample_multicast_update()), + }; + let json = serde_json::to_string(&update).unwrap(); + let back: Update = serde_json::from_str(&json).unwrap(); + assert!(back.multicast.is_some()); + assert_eq!(back.multicast.unwrap().announce.len(), 1,); + } + + #[test] + fn v4_update_deserializes_as_v3_drops_multicast() { + let update = Update { + underlay: None, + tunnel: None, + multicast: Some(sample_multicast_update()), + }; + let json = serde_json::to_string(&update).unwrap(); + // A V3 peer would deserialize this as UpdateV3, silently + // dropping the unknown multicast field. + let v3: UpdateV3 = serde_json::from_str(&json).unwrap(); + assert!(v3.underlay.is_none()); + assert!(v3.tunnel.is_none()); + } + + #[test] + fn v3_update_deserializes_as_v4_multicast_none() { + let v3 = UpdateV3 { + underlay: None, + tunnel: None, + }; + let json = serde_json::to_string(&v3).unwrap(); + // A V4 peer receiving a V3 update gets multicast: None. + let update: Update = serde_json::from_str(&json).unwrap(); + assert!(update.multicast.is_none()); + } + + #[test] + fn v4_pull_response_round_trips() { + let origin = MulticastOrigin { + overlay_group: "ff0e::1".parse().unwrap(), + underlay_group: UnderlayMulticastIpv6::new( + "ff04::2".parse().unwrap(), + ) + .unwrap(), + vni: Vni::try_from(77u32).unwrap(), + metric: 0, + source: None, + }; + let pv = MulticastPathVector { + origin, + path: vec![], + }; + let resp = PullResponse { + underlay: None, + tunnel: None, + multicast: Some([pv].into_iter().collect()), + }; + let json = serde_json::to_string(&resp).unwrap(); + let back: PullResponse = serde_json::from_str(&json).unwrap(); + assert!(back.multicast.is_some()); + } + + #[test] + fn v4_pull_response_deserializes_as_v3() { + let origin = MulticastOrigin { + overlay_group: "233.252.0.1".parse().unwrap(), + underlay_group: UnderlayMulticastIpv6::new( + "ff04::1".parse().unwrap(), + ) + .unwrap(), + vni: Vni::try_from(77u32).unwrap(), + metric: 0, + source: None, + }; + let pv = MulticastPathVector { + origin, + path: vec![], + }; + let resp = PullResponse { + underlay: None, + tunnel: None, + multicast: Some([pv].into_iter().collect()), + }; + let json = serde_json::to_string(&resp).unwrap(); + // V3 peer drops the multicast field. + let v3: PullResponseV3 = serde_json::from_str(&json).unwrap(); + assert!(v3.underlay.is_none()); + assert!(v3.tunnel.is_none()); + } + + #[test] + fn v3_pull_response_deserializes_as_v4() { + let v3 = PullResponseV3 { + underlay: None, + tunnel: None, + }; + let json = serde_json::to_string(&v3).unwrap(); + let resp: PullResponse = serde_json::from_str(&json).unwrap(); + assert!(resp.multicast.is_none()); + } + + #[test] + fn from_conversions_strip_multicast() { + let update = Update { + underlay: None, + tunnel: None, + multicast: Some(sample_multicast_update()), + }; + let v3 = UpdateV3::from(update); + let back = Update::from(v3); + assert!(back.multicast.is_none()); + } +} diff --git a/ddm/src/lib.rs b/ddm/src/lib.rs index 447109ba..7699ce43 100644 --- a/ddm/src/lib.rs +++ b/ddm/src/lib.rs @@ -11,6 +11,13 @@ pub mod sm; pub mod sys; mod util; +/// Returns `None` if the set is empty, otherwise `Some(s)`. +pub(crate) fn non_empty( + set: std::collections::HashSet, +) -> Option> { + (!set.is_empty()).then_some(set) +} + #[macro_export] macro_rules! err { ($log:expr, $index:expr, $($args:tt)+) => { diff --git a/ddm/src/sm.rs b/ddm/src/sm.rs index 24215795..604b9080 100644 --- a/ddm/src/sm.rs +++ b/ddm/src/sm.rs @@ -4,12 +4,13 @@ use crate::db::Db; use crate::discovery::Version; -use crate::exchange::{TunnelUpdate, UnderlayUpdate, Update}; +use crate::exchange::{MulticastUpdate, TunnelUpdate, UnderlayUpdate, Update}; use crate::{dbg, discovery, err, exchange, inf, wrn}; use ddm_types::db::RouterKind; +use ddm_types::exchange::MulticastPathHop; use ddm_types::exchange::PathVector; use libnet::get_ipaddr_info; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use slog::Logger; use std::collections::HashSet; @@ -41,11 +42,12 @@ pub enum AdminEvent { pub enum PrefixSet { Underlay(HashSet), Tunnel(HashSet), + Multicast(HashSet), } #[derive(Debug)] pub enum PeerEvent { - Push(Update), + Push(Arc), } #[derive(Debug)] @@ -425,7 +427,7 @@ impl Exchange { ); let interval = 250; // TODO as parameter loop { - match exchange::do_pull( + match exchange::do_pull_v4( &self.ctx, &self.ctx.config.addr, &self.ctx.rt, @@ -455,7 +457,7 @@ impl Exchange { ) { exchange_thread.abort(); self.ctx.db.remove_peer(self.ctx.config.if_index); - let (to_remove, to_remove_tnl) = + let (to_remove, to_remove_tnl, to_remove_mcast) = self.ctx.db.remove_nexthop_routes(self.peer); let mut routes: Vec = Vec::new(); for x in &to_remove { @@ -492,12 +494,9 @@ impl Exchange { self.ctx.event_channels.len() ); - let underlay = if to_remove.is_empty() { - None - } else { - Some(UnderlayUpdate::withdraw( - to_remove - .iter() + let underlay = crate::non_empty(to_remove).map(|set| { + UnderlayUpdate::withdraw( + set.iter() .map(|x| PathVector { destination: x.destination, path: { @@ -507,20 +506,39 @@ impl Exchange { }, }) .collect(), - )) - }; + ) + }); - let tunnel = if to_remove_tnl.is_empty() { - None - } else { - Some(TunnelUpdate::withdraw( - to_remove_tnl.iter().cloned().map(Into::into).collect(), - )) - }; + let tunnel = crate::non_empty(to_remove_tnl).map(|set| { + TunnelUpdate::withdraw( + set.iter().cloned().map(Into::into).collect(), + ) + }); - let push = Update { underlay, tunnel }; + // Build multicast withdrawal with our hop info. + let multicast = crate::non_empty(to_remove_mcast).map(|set| { + let hop = MulticastPathHop::new( + self.ctx.hostname.clone(), + self.ctx.config.addr, + ); + MulticastUpdate::withdraw( + set.iter() + .map(|route| ddm_types::exchange::MulticastPathVector { + origin: route.origin.clone(), + path: vec![hop.clone()], + }) + .collect(), + ) + }); + + let push = Arc::new(Update { + underlay, + tunnel, + multicast, + }); for ec in &self.ctx.event_channels { - ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); + ec.send(Event::Peer(PeerEvent::Push(Arc::clone(&push)))) + .unwrap(); } } pull_stop.store(true, Ordering::Relaxed); @@ -601,6 +619,7 @@ impl State for Exchange { "announce: {}", e, ); + wrn!( self.log, self.ctx.config.if_name, @@ -728,6 +747,104 @@ impl State for Exchange { ); } } + Event::Admin(AdminEvent::Announce(PrefixSet::Multicast( + groups, + ))) => { + // Convert `MulticastOrigin` to `MulticastPathVector` with + // our hop info + let hop = MulticastPathHop::new( + self.ctx.hostname.clone(), + self.ctx.config.addr, + ); + let pvs: HashSet<_> = groups + .iter() + .map(|origin| { + ddm_types::exchange::MulticastPathVector { + origin: origin.clone(), + path: vec![hop.clone()], + } + }) + .collect(); + + if let Err(e) = crate::exchange::announce_multicast( + &self.ctx, + self.ctx.config.clone(), + pvs, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "announce multicast: {}", + e, + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast announce", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } + Event::Admin(AdminEvent::Withdraw(PrefixSet::Multicast( + groups, + ))) => { + // Convert MulticastOrigin to MulticastPathVector for withdrawal + let hop = MulticastPathHop::new( + self.ctx.hostname.clone(), + self.ctx.config.addr, + ); + let pvs: HashSet<_> = groups + .iter() + .map(|origin| { + ddm_types::exchange::MulticastPathVector { + origin: origin.clone(), + path: vec![hop.clone()], + } + }) + .collect(); + + if let Err(e) = crate::exchange::withdraw_multicast( + &self.ctx, + self.ctx.config.clone(), + pvs, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "withdraw multicast: {e}", + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast withdraw", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } Event::Admin(AdminEvent::Expire(peer)) => { if self.peer == peer { inf!( @@ -770,6 +887,8 @@ impl State for Exchange { self.peer, update, ); + let update = Arc::try_unwrap(update) + .unwrap_or_else(|arc| (*arc).clone()); if let Some(push) = update.underlay { if !push.announce.is_empty() && let Err(e) = crate::exchange::announce_underlay( @@ -817,8 +936,7 @@ impl State for Exchange { err!( self.log, self.ctx.config.if_name, - "withdraw: {}", - e, + "withdraw: {e}", ); wrn!( self.log, @@ -836,6 +954,71 @@ impl State for Exchange { ); } } + // Handle multicast redistribution + if let Some(push) = update.multicast { + if !push.announce.is_empty() + && let Err(e) = crate::exchange::announce_multicast( + &self.ctx, + self.ctx.config.clone(), + push.announce, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) + { + err!( + self.log, + self.ctx.config.if_name, + "announce multicast: {e}", + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast announce", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + if !push.withdraw.is_empty() + && let Err(e) = crate::exchange::withdraw_multicast( + &self.ctx, + self.ctx.config.clone(), + push.withdraw, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) + { + err!( + self.log, + self.ctx.config.if_name, + "withdraw multicast: {e}", + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast withdraw", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } } Event::Neighbor(NeighborEvent::Expire) => { wrn!( diff --git a/ddmadm/src/main.rs b/ddmadm/src/main.rs index 800315d8..08ffe515 100644 --- a/ddmadm/src/main.rs +++ b/ddmadm/src/main.rs @@ -60,6 +60,18 @@ enum SubCommand { /// Withdraw prefixes from a DDM router. TunnelWithdraw(TunnelEndpoint), + /// Get multicast groups imported from DDM peers. + MulticastImported, + + /// Get locally originated multicast groups. + MulticastOriginated, + + /// Advertise multicast groups from this router. + MulticastAdvertise(MulticastGroup), + + /// Withdraw multicast groups from this router. + MulticastWithdraw(MulticastGroup), + /// Sync prefix information from peers. Sync, } @@ -84,6 +96,29 @@ struct TunnelEndpoint { pub metric: u64, } +#[derive(Debug, Parser)] +struct MulticastGroup { + /// Overlay multicast group address (e.g. 233.252.0.1 or ff0e::1). + #[arg(short = 'g', long)] + pub overlay_group: IpAddr, + + /// Underlay multicast address (ff04::/64 admin-local scope). + #[arg(short = 'u', long)] + pub underlay_group: Ipv6Addr, + + /// Virtual Network Identifier. + #[arg(short, long)] + pub vni: u32, + + /// Path metric. + #[arg(short, long, default_value_t = 0)] + pub metric: u64, + + /// Source address for (S,G) routes (omit for (*,G)). + #[arg(short, long)] + pub source: Option, +} + #[derive(Debug, Parser)] struct Peer { addr: Ipv6Addr, @@ -242,6 +277,107 @@ async fn run() -> Result<()> { }]) .await?; } + SubCommand::MulticastImported => { + let msg = client.get_multicast_groups().await?; + let mut routes: Vec<_> = msg.into_inner().into_iter().collect(); + routes.sort_by(|a, b| { + a.origin + .overlay_group + .cmp(&b.origin.overlay_group) + .then_with(|| a.origin.source.cmp(&b.origin.source)) + }); + let mut tw = TabWriter::new(stdout()); + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}\t{}", + "Overlay Group".dimmed(), + "Underlay Group".dimmed(), + "VNI".dimmed(), + "Metric".dimmed(), + "Source".dimmed(), + "Path".dimmed(), + )?; + for route in &routes { + let source = match &route.origin.source { + Some(s) => s.to_string(), + None => "(*,G)".to_string(), + }; + let path: Vec<_> = route + .path + .iter() + .rev() + .map(|h| h.router_id.clone()) + .collect(); + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}\t{}", + route.origin.overlay_group, + route.origin.underlay_group, + route.origin.vni, + route.origin.metric, + source, + path.join(" "), + )?; + } + tw.flush()?; + } + SubCommand::MulticastOriginated => { + let msg = client.get_originated_multicast_groups().await?; + let mut origins: Vec<_> = msg.into_inner().into_iter().collect(); + origins.sort_by(|a, b| { + a.overlay_group + .cmp(&b.overlay_group) + .then_with(|| a.source.cmp(&b.source)) + }); + let mut tw = TabWriter::new(stdout()); + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}", + "Overlay Group".dimmed(), + "Underlay Group".dimmed(), + "VNI".dimmed(), + "Metric".dimmed(), + "Source".dimmed(), + )?; + for origin in &origins { + let source = match &origin.source { + Some(s) => s.to_string(), + None => "(*,G)".to_string(), + }; + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}", + origin.overlay_group, + origin.underlay_group, + origin.vni, + origin.metric, + source, + )?; + } + tw.flush()?; + } + SubCommand::MulticastAdvertise(mg) => { + client + .advertise_multicast_groups(&vec![types::MulticastOrigin { + overlay_group: mg.overlay_group, + underlay_group: mg.underlay_group, + vni: types::Vni(mg.vni), + metric: mg.metric, + source: mg.source, + }]) + .await?; + } + SubCommand::MulticastWithdraw(mg) => { + client + .withdraw_multicast_groups(&vec![types::MulticastOrigin { + overlay_group: mg.overlay_group, + underlay_group: mg.underlay_group, + vni: types::Vni(mg.vni), + metric: mg.metric, + source: mg.source, + }]) + .await?; + } SubCommand::Sync => { client.sync().await?; } diff --git a/mg-common/Cargo.toml b/mg-common/Cargo.toml index 87a30308..657d5556 100644 --- a/mg-common/Cargo.toml +++ b/mg-common/Cargo.toml @@ -12,6 +12,7 @@ schemars.workspace = true slog.workspace = true slog-bunyan.workspace = true slog-async.workspace = true +thiserror.workspace = true oximeter-producer.workspace = true oximeter.workspace = true oxnet.workspace = true @@ -19,12 +20,16 @@ backoff.workspace = true smf.workspace = true uuid.workspace = true libc.workspace = true +omicron-common.workspace = true # We need this on illumos, but must omit it on other platforms [target.'cfg(target_os = "illumos")'.dependencies.libnet] workspace = true optional = true +[dev-dependencies] +serde_json.workspace = true + [features] default = ["libnet"] libnet = ["dep:libnet"] diff --git a/mg-common/src/net.rs b/mg-common/src/net.rs index f1784afe..8aaa69b7 100644 --- a/mg-common/src/net.rs +++ b/mg-common/src/net.rs @@ -2,10 +2,114 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +// Re-export so consumers of MulticastOrigin.vni don't need a direct +// omicron_common dependency. +pub use omicron_common::api::external::Vni; + +use omicron_common::address::UNDERLAY_MULTICAST_SUBNET; use oxnet::{IpNet, Ipv4Net, Ipv6Net}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::fmt; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::str::FromStr; +use thiserror::Error; + +fn default_multicast_vni() -> Vni { + Vni::DEFAULT_MULTICAST_VNI +} + +/// Error constructing an [`UnderlayMulticastIpv6`] address. +#[derive(Debug, Clone, Error)] +pub enum UnderlayMulticastError { + /// The address is not within the underlay multicast subnet (ff04::/64). + #[error( + "underlay address {addr} is not within {UNDERLAY_MULTICAST_SUBNET}" + )] + NotInSubnet { addr: Ipv6Addr }, + + /// The string could not be parsed as an IPv6 address. + #[error("invalid IPv6 address: {0}")] + InvalidIpv6(#[from] std::net::AddrParseError), +} + +/// A validated underlay multicast IPv6 address within ff04::/64. +/// +/// The Oxide rack maps overlay multicast groups 1:1 to admin-local scoped +/// IPv6 multicast addresses in `UNDERLAY_MULTICAST_SUBNET` (ff04::/64). +/// This type enforces that invariant at construction time. +#[derive( + Debug, + Copy, + Clone, + Eq, + PartialEq, + PartialOrd, + Ord, + Hash, + Serialize, + Deserialize, + JsonSchema, +)] +#[serde(try_from = "Ipv6Addr", into = "Ipv6Addr")] +#[schemars(transparent)] +pub struct UnderlayMulticastIpv6(Ipv6Addr); + +impl UnderlayMulticastIpv6 { + /// Create a new validated underlay multicast address. + /// + /// # Errors + /// + /// Returns [`UnderlayMulticastError::NotInSubnet`] if the address is + /// not within ff04::/64. + pub fn new(value: Ipv6Addr) -> Result { + if !UNDERLAY_MULTICAST_SUBNET.contains(value) { + return Err(UnderlayMulticastError::NotInSubnet { addr: value }); + } + Ok(Self(value)) + } + + /// Returns the underlying IPv6 address. + #[inline] + pub const fn ip(&self) -> Ipv6Addr { + self.0 + } +} + +impl fmt::Display for UnderlayMulticastIpv6 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl TryFrom for UnderlayMulticastIpv6 { + type Error = UnderlayMulticastError; + + fn try_from(value: Ipv6Addr) -> Result { + Self::new(value) + } +} + +impl From for Ipv6Addr { + fn from(addr: UnderlayMulticastIpv6) -> Self { + addr.0 + } +} + +impl From for IpAddr { + fn from(addr: UnderlayMulticastIpv6) -> Self { + IpAddr::V6(addr.0) + } +} + +impl FromStr for UnderlayMulticastIpv6 { + type Err = UnderlayMulticastError; + + fn from_str(s: &str) -> Result { + let addr: Ipv6Addr = s.parse()?; + Self::new(addr) + } +} #[derive( Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, @@ -93,3 +197,134 @@ pub enum IpPrefix { V4(Ipv4Prefix), V6(Ipv6Prefix), } + +/// Origin information for a multicast group announcement. +/// +/// This is analogous to TunnelOrigin but for multicast groups. +/// +/// This represents a subscription to a multicast group that should be +/// advertised via DDM. The overlay_group is the application-visible multicast +/// address (e.g., 233.252.0.1 or ff0e::1), while underlay_group is the mapped +/// admin-local scoped IPv6 address (ff04::X) used in the underlay network. +#[derive(Debug, Clone, Eq, Serialize, Deserialize, JsonSchema)] +pub struct MulticastOrigin { + /// The overlay multicast group address (IPv4 or IPv6). + /// This is the group address visible to applications. + pub overlay_group: IpAddr, + + /// The underlay multicast group address (ff04::X). + /// Validated at construction to be within ff04::/64. + pub underlay_group: UnderlayMulticastIpv6, + + /// VNI for this multicast group (identifies the VPC/network context). + #[serde(default = "default_multicast_vni")] + pub vni: Vni, + + /// Metric for path selection (lower is better). + /// + /// Used for multi-rack replication optimization. + /// Excluded from identity (Hash/Eq) so that metric changes update + /// an existing entry rather than creating a duplicate. + #[serde(default)] + pub metric: u64, + + /// Optional source address for Source-Specific Multicast (S,G) routes. + /// None for Any-Source Multicast (*,G) routes. + #[serde(default)] + pub source: Option, +} + +// Equality and hashing consider only the identity fields (overlay_group, +// underlay_group, vni, source), not metric. This allows metric updates to +// replace existing entries in HashSet-based collections without creating +// duplicates. This type is not used in ordered collections (BTreeSet). +// See #649 for why adding Ord here would require more care. +impl PartialEq for MulticastOrigin { + fn eq(&self, other: &Self) -> bool { + self.overlay_group == other.overlay_group + && self.underlay_group == other.underlay_group + && self.vni == other.vni + && self.source == other.source + } +} + +impl std::hash::Hash for MulticastOrigin { + fn hash(&self, state: &mut H) { + self.overlay_group.hash(state); + self.underlay_group.hash(state); + self.vni.hash(state); + self.source.hash(state); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn underlay_valid_ff04() { + let addr = Ipv6Addr::new(0xff04, 0, 0, 0, 0, 0, 0, 1); + assert!(UnderlayMulticastIpv6::new(addr).is_ok()); + } + + #[test] + fn underlay_rejects_non_admin_local() { + // ff0e:: is global scope, not admin-local + let addr = Ipv6Addr::new(0xff0e, 0, 0, 0, 0, 0, 0, 1); + assert!(UnderlayMulticastIpv6::new(addr).is_err()); + } + + #[test] + fn underlay_rejects_unicast() { + let addr = Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1); + assert!(UnderlayMulticastIpv6::new(addr).is_err()); + } + + #[test] + fn underlay_serde_round_trip() { + let addr = UnderlayMulticastIpv6::new(Ipv6Addr::new( + 0xff04, 0, 0, 0, 0, 0, 0, 42, + )) + .unwrap(); + let json = serde_json::to_string(&addr).unwrap(); + let back: UnderlayMulticastIpv6 = serde_json::from_str(&json).unwrap(); + assert_eq!(addr, back); + } + + #[test] + fn underlay_serde_rejects_invalid() { + // ff0e::1 serialized as an Ipv6Addr, then deserialized as + // UnderlayMulticastIpv6 should fail via try_from. + let json = + serde_json::to_string(&Ipv6Addr::new(0xff0e, 0, 0, 0, 0, 0, 0, 1)) + .unwrap(); + let result: Result = + serde_json::from_str(&json); + assert!(result.is_err()); + } + + #[test] + fn multicast_origin_rejects_bad_underlay() { + let json = serde_json::json!({ + "overlay_group": "233.252.0.1", + "underlay_group": "ff0e::1", + "vni": 77 + }); + let result: Result = serde_json::from_value(json); + assert!(result.is_err()); + } + + #[test] + fn multicast_origin_accepts_valid() { + let json = serde_json::json!({ + "overlay_group": "233.252.0.1", + "underlay_group": "ff04::1", + "vni": 77 + }); + let origin: MulticastOrigin = serde_json::from_value(json).unwrap(); + assert_eq!( + origin.underlay_group.ip(), + Ipv6Addr::new(0xff04, 0, 0, 0, 0, 0, 0, 1), + ); + } +} diff --git a/mg-lower/src/ddm.rs b/mg-lower/src/ddm.rs index ac7d9708..2f987477 100644 --- a/mg-lower/src/ddm.rs +++ b/mg-lower/src/ddm.rs @@ -5,7 +5,7 @@ use crate::log::ddm_log; #[cfg(target_os = "illumos")] use ddm_admin_client::Client; -use ddm_admin_client::types::TunnelOrigin; +use ddm_admin_client::types::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use slog::Logger; use std::{net::Ipv6Addr, sync::Arc}; @@ -111,3 +111,57 @@ pub(crate) fn remove_tunnel_routes<'a, I: Iterator>( pub fn new_ddm_client(log: &Logger) -> Client { Client::new("http://localhost:8000", log.clone()) } + +pub(crate) fn add_multicast_routes< + 'a, + I: Iterator, +>( + client: &impl Ddm, + routes: I, + rt: &Arc, + log: &Logger, +) { + let routes: Vec = routes.cloned().collect(); + if routes.is_empty() { + return; + } + let resp = + rt.block_on(async { client.advertise_multicast_groups(&routes).await }); + if let Err(e) = resp { + ddm_log!(log, + error, + "advertise multicast groups error: {e}"; + "error" => format!("{e}"), + "groups" => format!("{routes:#?}") + ); + } +} + +pub(crate) fn remove_multicast_routes< + 'a, + I: Iterator, +>( + client: &impl Ddm, + routes: I, + rt: &Arc, + log: &Logger, +) { + let routes: Vec = routes.cloned().collect(); + if routes.is_empty() { + return; + } + let resp = + rt.block_on(async { client.withdraw_multicast_groups(&routes).await }); + match resp { + Err(e) => ddm_log!(log, + error, + "withdraw multicast groups error: {e}"; + "groups" => format!("{routes:#?}") + ), + Ok(_) => ddm_log!(log, + debug, + "withdrew multicast groups"; + "groups" => format!("{routes:#?}") + ), + } +} diff --git a/mg-lower/src/lib.rs b/mg-lower/src/lib.rs index bcbf5b57..c1b84595 100644 --- a/mg-lower/src/lib.rs +++ b/mg-lower/src/lib.rs @@ -39,6 +39,7 @@ mod ddm; mod dendrite; mod error; mod log; +pub mod mrib; mod platform; #[cfg(test)] diff --git a/mg-lower/src/mrib.rs b/mg-lower/src/mrib.rs new file mode 100644 index 00000000..8645bc8e --- /dev/null +++ b/mg-lower/src/mrib.rs @@ -0,0 +1,208 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! MRIB (Multicast Routing Information Base) synchronization to DDM. +//! +//! This module watches for MRIB changes and propagates multicast group +//! subscriptions to DDM for distribution across the underlay network. +//! +//! ## Data Flow +//! +//! ```text +//! MRIB (loc_mrib changes) +//! | +//! v [MribChangeNotification] +//! mg-lower/mrib.rs +//! | +//! v [MulticastOrigin] +//! DDM admin API +//! | +//! v [DDM exchange protocol] +//! Other sleds/racks +//! ``` + +use crate::ddm::{ + add_multicast_routes, new_ddm_client, remove_multicast_routes, +}; +use crate::platform::{Ddm, ProductionDdm}; +use ddm_admin_client::types::MulticastOrigin; +use mg_common::net::Vni; +use rdb::Mrib; +use rdb::types::{MribChangeNotification, MulticastAddr, MulticastRoute}; +use slog::{Logger, debug, error, info}; +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::mpsc::{RecvTimeoutError, channel}; +use std::thread::sleep; +use std::time::Duration; + +const MG_LOWER_MRIB_TAG: &str = "mg-lower-mrib"; + +/// Convert an MRIB [`MulticastRoute`] to a DDM [`MulticastOrigin`]. +/// +/// [`MulticastOrigin`]: ddm_admin_client::types::MulticastOrigin +fn ddm_origin(route: &MulticastRoute) -> MulticastOrigin { + mg_common::net::MulticastOrigin::from(route).into() +} + +/// Run the MRIB synchronization loop. +/// +/// This function loops forever, watching for MRIB changes and synchronizing +/// them to DDM. It runs on the calling thread. +pub fn run(mrib: Mrib, log: Logger, rt: Arc) { + loop { + let (tx, rx) = channel(); + + // Register as MRIB watcher + mrib.watch(MG_LOWER_MRIB_TAG.into(), tx); + + let ddm = ProductionDdm { + client: new_ddm_client(&log), + }; + + // Initial full sync + if let Err(e) = full_sync(&mrib, &ddm, &log, &rt) { + error!(log, "MRIB full sync failed: {e}"); + info!(log, "restarting MRIB sync loop in one second"); + sleep(Duration::from_secs(1)); + continue; + } + + // Handle incremental changes + loop { + match rx.recv_timeout(Duration::from_secs(10)) { + Ok(notification) => { + if let Err(e) = + handle_change(&mrib, notification, &ddm, &log, &rt) + { + error!(log, "MRIB change handling failed: {e}"); + } + } + Err(RecvTimeoutError::Timeout) => { + // Periodic full sync to catch any missed changes + if let Err(e) = full_sync(&mrib, &ddm, &log, &rt) { + error!(log, "MRIB periodic sync failed: {e}"); + } + } + Err(RecvTimeoutError::Disconnected) => { + error!(log, "MRIB watcher disconnected"); + break; + } + } + } + } +} + +/// Perform a full synchronization of MRIB to DDM. +/// +/// This compares the current MRIB loc_mrib with what DDM has advertised +/// and reconciles any differences. +pub(crate) fn full_sync( + mrib: &Mrib, + ddm: &D, + log: &Logger, + rt: &Arc, +) -> Result<(), String> { + // Get current MRIB state (installed/selected routes) + let mrib_routes = mrib.loc_mrib(); + + // Convert to DDM MulticastOrigin set + let mrib_origins: HashSet = + mrib_routes.values().map(ddm_origin).collect(); + + // Get current DDM advertised state + let ddm_current: HashSet = rt + .block_on(async { ddm.get_originated_multicast_groups().await }) + .map_err(|e| format!("failed to get DDM multicast groups: {e}"))? + .into_inner() + .into_iter() + .collect(); + + // Compute diff + let to_add: Vec<_> = mrib_origins.difference(&ddm_current).collect(); + let to_remove: Vec<_> = ddm_current.difference(&mrib_origins).collect(); + + if !to_add.is_empty() { + info!( + log, + "MRIB sync: adding {} multicast groups to DDM", + to_add.len() + ); + add_multicast_routes(ddm, to_add.into_iter(), rt, log); + } + + if !to_remove.is_empty() { + info!( + log, + "MRIB sync: removing {} multicast groups from DDM", + to_remove.len() + ); + remove_multicast_routes(ddm, to_remove.into_iter(), rt, log); + } + + Ok(()) +} + +/// Handle an incremental MRIB change notification. +fn handle_change( + mrib: &Mrib, + notification: MribChangeNotification, + ddm: &D, + log: &Logger, + rt: &Arc, +) -> Result<(), String> { + // Get current DDM state for comparison + let ddm_current: HashSet = rt + .block_on(async { ddm.get_originated_multicast_groups().await }) + .map_err(|e| format!("failed to get DDM multicast groups: {e}"))? + .into_inner() + .into_iter() + .collect(); + + let mut to_add = Vec::new(); + let mut to_remove = Vec::new(); + + for key in notification.changed { + // Check if route exists in loc_mrib (installed) + if let Some(route) = mrib.get_selected_route(&key) { + let origin = ddm_origin(&route); + if !ddm_current.contains(&origin) { + to_add.push(origin); + } + } else { + // Route was removed from loc_mrib, so we need to find matching DDM + // origin. We check all DDM origins to find any that match this key + for ddm_origin in &ddm_current { + // Reconstruct the key from the DDM origin to compare + if let Ok(overlay_group) = + MulticastAddr::try_from(ddm_origin.overlay_group) + && let Ok(ddm_key) = rdb::types::MulticastRouteKey::new( + ddm_origin.source, + overlay_group, + Vni::DEFAULT_MULTICAST_VNI, + ) + && ddm_key == key + { + to_remove.push(ddm_origin.clone()); + } + } + } + } + + if !to_add.is_empty() { + debug!(log, "MRIB change: adding {} multicast groups", to_add.len()); + add_multicast_routes(ddm, to_add.iter(), rt, log); + } + + if !to_remove.is_empty() { + debug!( + log, + "MRIB change: removing {} multicast groups", + to_remove.len() + ); + remove_multicast_routes(ddm, to_remove.iter(), rt, log); + } + + Ok(()) +} diff --git a/mg-lower/src/platform.rs b/mg-lower/src/platform.rs index a05143b9..1d453a1e 100644 --- a/mg-lower/src/platform.rs +++ b/mg-lower/src/platform.rs @@ -216,6 +216,46 @@ pub trait Ddm { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, >; + + /// Get multicast group subscriptions originated by this router. + /// + /// Each `MulticastOrigin` pairs an overlay group address with its + /// underlay mapping (ff04::/64) and optional source for (S,G) routes. + /// + /// Method names follow the DDM admin API convention + /// (`originated_multicast_groups`, not `originated_multicast_origins`). + async fn get_originated_multicast_groups( + &self, + ) -> Result< + ddm_admin_client::ResponseValue>, + ddm_admin_client::Error, + >; + + /// Advertise multicast group subscriptions to DDM peers. + /// + /// Each entry is a `MulticastOrigin` pairing an overlay group + /// with its ff04::/64 underlay mapping. + #[allow(clippy::ptr_arg)] + async fn advertise_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + >; + + /// Withdraw multicast group subscriptions from DDM peers. + /// + /// Each entry is a `MulticastOrigin` pairing an overlay group + /// with its ff04::/64 underlay mapping. + #[allow(clippy::ptr_arg)] + async fn withdraw_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + >; } /// This trait wraps the methods that have expectations about switch zone @@ -405,6 +445,35 @@ impl Ddm for ProductionDdm { > { self.client.withdraw_tunnel_endpoints(body).await } + + async fn get_originated_multicast_groups( + &self, + ) -> Result< + ddm_admin_client::ResponseValue>, + ddm_admin_client::Error, + > { + self.client.get_originated_multicast_groups().await + } + + async fn advertise_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + self.client.advertise_multicast_groups(body).await + } + + async fn withdraw_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + self.client.withdraw_multicast_groups(body).await + } } /// Production switch zone that uses libnet for route lookups (illumos only). @@ -430,6 +499,7 @@ pub(crate) mod test { use crate::MG_LOWER_TAG; use super::*; + use mg_common::lock; use std::sync::Mutex; use std::{collections::HashMap, net::IpAddr}; @@ -484,7 +554,7 @@ pub(crate) mod test { link_id: &LinkId, ) -> Result, DpdClientError> { - let links = self.links.lock().unwrap(); + let links = lock!(self.links); let link = links .iter() .find(|x| &x.port_id == port_id && &x.link_id == link_id); @@ -502,10 +572,7 @@ pub(crate) mod test { dpd_client::ResponseValue>, DpdClientError, > { - let result = self - .v4_routes - .lock() - .unwrap() + let result = lock!(self.v4_routes) .get(cidr) .cloned() .unwrap_or(Vec::default()); @@ -519,10 +586,7 @@ pub(crate) mod test { dpd_client::ResponseValue>, DpdClientError, > { - let result = self - .v6_routes - .lock() - .unwrap() + let result = lock!(self.v6_routes) .get(cidr) .cloned() .unwrap_or(Vec::default()); @@ -534,7 +598,7 @@ pub(crate) mod test { addr: &Ipv6Entry, ) -> Result, DpdClientError> { - self.loopback.lock().unwrap().replace(addr.clone()); + lock!(self.loopback).replace(addr.clone()); Ok(dpd_response_ok!(())) } @@ -545,7 +609,7 @@ pub(crate) mod test { dpd_client::ResponseValue>, DpdClientError, > { - let links = self.links.lock().unwrap(); + let links = lock!(self.links); let result = links .iter() .filter(|x| match filter { @@ -612,7 +676,7 @@ pub(crate) mod test { RouteTarget::V4(v4) => Route::V4(v4.clone()), RouteTarget::V6(v6) => Route::V6(v6.clone()), }; - let mut routes = self.v4_routes.lock().unwrap(); + let mut routes = lock!(self.v4_routes); match routes.get_mut(&body.cidr) { Some(targets) => { targets.push(route); @@ -629,7 +693,7 @@ pub(crate) mod test { body: &'a Ipv6RouteUpdate, ) -> Result, DpdClientError> { - let mut routes = self.v6_routes.lock().unwrap(); + let mut routes = lock!(self.v6_routes); match routes.get_mut(&body.cidr) { Some(targets) => { targets.push(body.target.clone()); @@ -649,7 +713,7 @@ pub(crate) mod test { tgt_ip: &'a IpAddr, ) -> Result, DpdClientError> { - let mut routes = self.v4_routes.lock().unwrap(); + let mut routes = lock!(self.v4_routes); if let Some(targets) = routes.get_mut(cidr) { targets.retain(|x| match (x, tgt_ip) { (Route::V4(x), IpAddr::V4(ip)) => { @@ -677,7 +741,7 @@ pub(crate) mod test { tgt_ip: &'a std::net::Ipv6Addr, ) -> Result, DpdClientError> { - let mut routes = self.v6_routes.lock().unwrap(); + let mut routes = lock!(self.v6_routes); if let Some(targets) = routes.get_mut(cidr) { targets.retain(|x| { !(x.tgt_ip == *tgt_ip @@ -699,6 +763,7 @@ pub(crate) mod test { pub(crate) struct TestDdm { pub(crate) tunnel_originated: Mutex>, pub(crate) originated: Mutex>, + pub(crate) multicast_originated: Mutex>, } impl Default for TestDdm { @@ -706,6 +771,7 @@ pub(crate) mod test { Self { tunnel_originated: Mutex::new(Vec::default()), originated: Mutex::new(Vec::default()), + multicast_originated: Mutex::new(Vec::default()), } } } @@ -717,9 +783,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue>, ddm_admin_client::Error, > { - Ok(ddm_response_ok!( - self.tunnel_originated.lock().unwrap().clone() - )) + Ok(ddm_response_ok!(lock!(self.tunnel_originated).clone())) } async fn get_originated( @@ -728,7 +792,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue>, ddm_admin_client::Error, > { - Ok(ddm_response_ok!(self.originated.lock().unwrap().clone())) + Ok(ddm_response_ok!(lock!(self.originated).clone())) } async fn advertise_prefixes<'a>( @@ -738,7 +802,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.originated.lock().unwrap().extend(body); + lock!(self.originated).extend(body); Ok(ddm_response_ok!(())) } @@ -749,7 +813,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.tunnel_originated.lock().unwrap().extend(body.clone()); + lock!(self.tunnel_originated).extend(body.clone()); Ok(ddm_response_ok!(())) } @@ -760,10 +824,38 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.tunnel_originated - .lock() - .unwrap() - .retain(|x| !body.contains(x)); + lock!(self.tunnel_originated).retain(|x| !body.contains(x)); + Ok(ddm_response_ok!(())) + } + + async fn get_originated_multicast_groups( + &self, + ) -> Result< + ddm_admin_client::ResponseValue>, + ddm_admin_client::Error, + > { + Ok(ddm_response_ok!(lock!(self.multicast_originated).clone())) + } + + async fn advertise_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + lock!(self.multicast_originated).extend(body.clone()); + Ok(ddm_response_ok!(())) + } + + async fn withdraw_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + lock!(self.multicast_originated).retain(|x| !body.contains(x)); Ok(ddm_response_ok!(())) } } diff --git a/mg-types/versions/Cargo.toml b/mg-types/versions/Cargo.toml index 854e12c1..254b82e9 100644 --- a/mg-types/versions/Cargo.toml +++ b/mg-types/versions/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] bfd.workspace = true bgp.workspace = true +mg-common.workspace = true rdb.workspace = true schemars.workspace = true serde.workspace = true diff --git a/mg-types/versions/src/multicast_support/mrib.rs b/mg-types/versions/src/multicast_support/mrib.rs index e6779b3a..7e4e4f88 100644 --- a/mg-types/versions/src/multicast_support/mrib.rs +++ b/mg-types/versions/src/multicast_support/mrib.rs @@ -8,9 +8,8 @@ use std::net::IpAddr; -use rdb::types::{ - AddressFamily, MulticastRouteKey, UnderlayMulticastIpv6, Vni, -}; +use mg_common::net::UnderlayMulticastIpv6; +use rdb::types::{AddressFamily, MulticastRouteKey, Vni}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/mgadm/src/mrib.rs b/mgadm/src/mrib.rs index 402421d2..ab4548c5 100644 --- a/mgadm/src/mrib.rs +++ b/mgadm/src/mrib.rs @@ -21,9 +21,9 @@ use mg_admin_client::types::{ MribRpfRebuildIntervalRequest, MulticastRoute, MulticastRouteKey, RouteOriginFilter, Vni, }; -use rdb::types::{AddressFamily, DEFAULT_MULTICAST_VNI}; +use rdb::types::AddressFamily; -const DEFAULT_VNI: u32 = DEFAULT_MULTICAST_VNI.as_u32(); +const DEFAULT_VNI: u32 = rdb::Vni::DEFAULT_MULTICAST_VNI.as_u32(); fn parse_route_origin(s: &str) -> Result { match s.to_lowercase().as_str() { diff --git a/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub b/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub new file mode 100644 index 00000000..0d935c8b --- /dev/null +++ b/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub @@ -0,0 +1 @@ +76204d2907209bd8b963fb2da976ea688282d990:openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json diff --git a/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json b/openapi/ddm-admin/ddm-admin-2.0.0-8aeda2.json similarity index 64% rename from openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json rename to openapi/ddm-admin/ddm-admin-2.0.0-8aeda2.json index fe80efd3..312fadd4 100644 --- a/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json +++ b/openapi/ddm-admin/ddm-admin-2.0.0-8aeda2.json @@ -6,7 +6,7 @@ "url": "https://oxide.computer", "email": "api@oxide.computer" }, - "version": "1.0.0" + "version": "2.0.0" }, "paths": { "/disable-stats": { @@ -51,6 +51,94 @@ } } }, + "/multicast_group": { + "put": { + "operationId": "advertise_multicast_groups", + "requestBody": { + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastOrigin" + }, + "uniqueItems": true + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "delete": { + "operationId": "withdraw_multicast_groups", + "requestBody": { + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastOrigin" + }, + "uniqueItems": true + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, + "/multicast_groups": { + "get": { + "operationId": "get_multicast_groups", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastRoute", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastRoute" + }, + "uniqueItems": true + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/originated": { "get": { "operationId": "get_originated", @@ -79,6 +167,34 @@ } } }, + "/originated_multicast_groups": { + "get": { + "operationId": "get_originated_multicast_groups", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastOrigin" + }, + "uniqueItems": true + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/originated_tunnel_endpoints": { "get": { "operationId": "get_originated_tunnel_endpoints", @@ -444,6 +560,106 @@ "type": "string", "pattern": "^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\\/([0-9]|[1-9][0-9]|1[0-1][0-9]|12[0-8])$" }, + "MulticastOrigin": { + "description": "Origin information for a multicast group announcement.\n\nThis is analogous to TunnelOrigin but for multicast groups.\n\nThis represents a subscription to a multicast group that should be advertised via DDM. The overlay_group is the application-visible multicast address (e.g., 233.252.0.1 or ff0e::1), while underlay_group is the mapped admin-local scoped IPv6 address (ff04::X) used in the underlay network.", + "type": "object", + "properties": { + "metric": { + "description": "Metric for path selection (lower is better).\n\nUsed for multi-rack replication optimization. Excluded from identity (Hash/Eq) so that metric changes update an existing entry rather than creating a duplicate.", + "default": 0, + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "overlay_group": { + "description": "The overlay multicast group address (IPv4 or IPv6). This is the group address visible to applications.", + "type": "string", + "format": "ip" + }, + "source": { + "nullable": true, + "description": "Optional source address for Source-Specific Multicast (S,G) routes. None for Any-Source Multicast (*,G) routes.", + "default": null, + "type": "string", + "format": "ip" + }, + "underlay_group": { + "description": "The underlay multicast group address (ff04::X). Validated at construction to be within ff04::/64.", + "type": "string", + "format": "ipv6" + }, + "vni": { + "description": "VNI for this multicast group (identifies the VPC/network context).", + "default": 77, + "allOf": [ + { + "$ref": "#/components/schemas/Vni" + } + ] + } + }, + "required": [ + "overlay_group", + "underlay_group" + ] + }, + "MulticastPathHop": { + "description": "A single hop in the multicast path, carrying metadata needed for replication optimization.", + "type": "object", + "properties": { + "downstream_subscriber_count": { + "description": "Number of downstream subscribers reachable via this hop. Used for load-aware replication decisions in multi-rack topologies.", + "default": 0, + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "router_id": { + "description": "Router identifier (hostname).", + "type": "string" + }, + "underlay_addr": { + "description": "The underlay address of this router (for replication targeting).", + "type": "string", + "format": "ipv6" + } + }, + "required": [ + "router_id", + "underlay_addr" + ] + }, + "MulticastRoute": { + "description": "A multicast route learned via DDM.\n\nCarries a MulticastOrigin (overlay group + ff04::/64 underlay mapping) and the path vector from the originating subscriber through intermediate transit routers.", + "type": "object", + "properties": { + "nexthop": { + "description": "Underlay nexthop address (DDM peer that advertised this route). Used to associate the route with a peer for expiration.", + "type": "string", + "format": "ipv6" + }, + "origin": { + "description": "The multicast group origin information.", + "allOf": [ + { + "$ref": "#/components/schemas/MulticastOrigin" + } + ] + }, + "path": { + "description": "Path vector from the originating subscriber outward. Each hop records the router that redistributed this subscription announcement. Used for loop detection on pull and for future replication optimization in multi-rack topologies.", + "default": [], + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastPathHop" + } + } + }, + "required": [ + "nexthop", + "origin" + ] + }, "PathVector": { "type": "object", "properties": { @@ -463,6 +679,7 @@ ] }, "PeerInfo": { + "description": "Peer information with an optional interface name.", "type": "object", "properties": { "addr": { @@ -472,6 +689,12 @@ "host": { "type": "string" }, + "if_name": { + "nullable": true, + "description": "Interface name the peer was discovered on (e.g., \"tfportrear0_0\").", + "default": null, + "type": "string" + }, "kind": { "$ref": "#/components/schemas/RouterKind" }, @@ -544,6 +767,12 @@ "nexthop", "origin" ] + }, + "Vni": { + "description": "A Geneve Virtual Network Identifier", + "type": "integer", + "format": "uint32", + "minimum": 0 } }, "responses": { diff --git a/openapi/ddm-admin/ddm-admin-latest.json b/openapi/ddm-admin/ddm-admin-latest.json index 45446659..aaa8691d 120000 --- a/openapi/ddm-admin/ddm-admin-latest.json +++ b/openapi/ddm-admin/ddm-admin-latest.json @@ -1 +1 @@ -ddm-admin-1.0.0-b6eac7.json \ No newline at end of file +ddm-admin-2.0.0-8aeda2.json \ No newline at end of file diff --git a/openapi/mg-admin/mg-admin-8.0.0-fc8d9c.json b/openapi/mg-admin/mg-admin-8.0.0-082a16.json similarity index 99% rename from openapi/mg-admin/mg-admin-8.0.0-fc8d9c.json rename to openapi/mg-admin/mg-admin-8.0.0-082a16.json index b264b109..77f5d427 100644 --- a/openapi/mg-admin/mg-admin-8.0.0-fc8d9c.json +++ b/openapi/mg-admin/mg-admin-8.0.0-082a16.json @@ -5440,7 +5440,7 @@ ] }, "PeerId": { - "description": "Identifies a BGP peer for session management and route tracking.\n\nBGP peers can be identified in two ways: - **Numbered**: Traditional BGP peering using explicit IP addresses - **Unnumbered**: Modern peering using interface names with link-local addresses\n\n# Unnumbered Peering\n\nUnnumbered BGP uses interface names as stable identifiers instead of IP addresses. This is important because: - Link-local IPv6 addresses are discovered dynamically via NDP - Multiple interfaces may have peers with the same link-local address (e.g., fe80::1 on eth0 and fe80::1 on eth1) - Scope ID (interface index) disambiguates link-local addresses, but is not stable across reboots - Interface names provide stable, unambiguous peer identification\n\n# Route Tracking\n\nThis type is used in [`BgpPathProperties`](crate::BgpPathProperties) to track which peer advertised a route. Using `PeerId` instead of `IpAddr` ensures: - Unnumbered peers are properly distinguished even if they share link-local IPs - Route cleanup correctly removes only the routes from the intended peer - No cross-contamination when multiple unnumbered sessions exist\n\n# Examples\n\n``` use rdb_types::PeerId; use std::net::IpAddr;\n\n// Numbered peer let numbered = PeerId::Ip(\"192.0.2.1\".parse::().unwrap());\n\n// Unnumbered peer let unnumbered = PeerId::Interface(\"eth0\".to_string()); ```", + "description": "Identifies a BGP peer for session management and route tracking.\n\nBGP peers can be identified in two ways: - **Numbered**: Traditional BGP peering using explicit IP addresses - **Unnumbered**: Modern peering using interface names with link-local addresses\n\n# Unnumbered Peering\n\nUnnumbered BGP uses interface names as stable identifiers instead of IP addresses. This is important because: - Link-local IPv6 addresses are discovered dynamically via NDP - Multiple interfaces may have peers with the same link-local address (e.g., fe80::1 on eth0 and fe80::1 on eth1) - Scope ID (interface index) disambiguates link-local addresses, but is not stable across reboots - Interface names provide stable, unambiguous peer identification\n\n# Route Tracking\n\nThis type is used in `BgpPathProperties` to track which peer advertised a route. Using `PeerId` instead of `IpAddr` ensures: - Unnumbered peers are properly distinguished even if they share link-local IPs - Route cleanup correctly removes only the routes from the intended peer - No cross-contamination when multiple unnumbered sessions exist\n\n# Examples\n\n``` use rdb_types::PeerId; use std::net::IpAddr;\n\n// Numbered peer let numbered = PeerId::Ip(\"192.0.2.1\".parse::().unwrap());\n\n// Unnumbered peer let unnumbered = PeerId::Interface(\"eth0\".to_string()); ```", "oneOf": [ { "description": "Numbered peer identified by IP address\n\nUsed for traditional BGP sessions where peers are configured with explicit IP addresses (either IPv4 or IPv6 global unicast).", diff --git a/openapi/mg-admin/mg-admin-latest.json b/openapi/mg-admin/mg-admin-latest.json index 329966e4..38b6e356 120000 --- a/openapi/mg-admin/mg-admin-latest.json +++ b/openapi/mg-admin/mg-admin-latest.json @@ -1 +1 @@ -mg-admin-8.0.0-fc8d9c.json \ No newline at end of file +mg-admin-8.0.0-082a16.json \ No newline at end of file diff --git a/rdb-types/src/lib.rs b/rdb-types/src/lib.rs index af365e51..44a3777c 100644 --- a/rdb-types/src/lib.rs +++ b/rdb-types/src/lib.rs @@ -459,8 +459,8 @@ pub enum ProtocolFilter { /// /// # Route Tracking /// -/// This type is used in [`BgpPathProperties`](crate::BgpPathProperties) to track -/// which peer advertised a route. Using `PeerId` instead of `IpAddr` ensures: +/// This type is used in `BgpPathProperties` to track which peer advertised a +/// route. Using `PeerId` instead of `IpAddr` ensures: /// - Unnumbered peers are properly distinguished even if they share link-local IPs /// - Route cleanup correctly removes only the routes from the intended peer /// - No cross-contamination when multiple unnumbered sessions exist diff --git a/rdb/src/db.rs b/rdb/src/db.rs index e4e52f03..15dfa39e 100644 --- a/rdb/src/db.rs +++ b/rdb/src/db.rs @@ -2020,14 +2020,14 @@ impl Reaper { #[cfg(test)] mod test { use crate::{ - AddressFamily, DEFAULT_MULTICAST_VNI, DEFAULT_RIB_PRIORITY_STATIC, - Path, Prefix, Prefix4, Prefix6, StaticRouteKey, + AddressFamily, DEFAULT_RIB_PRIORITY_STATIC, Path, Prefix, Prefix4, + Prefix6, StaticRouteKey, db::Db, test::{TEST_WAIT_ITERATIONS, TestDb}, types::{ MulticastAddr, MulticastAddrV4, MulticastAddrV6, MulticastRoute, MulticastRouteKey, MulticastSourceProtocol, PrefixDbKey, - UnderlayMulticastIpv6, UnicastAddrV4, UnicastAddrV6, + UnderlayMulticastIpv6, UnicastAddrV4, UnicastAddrV6, Vni, test_helpers::path_vecs_equal, }, }; @@ -2385,7 +2385,7 @@ mod test { let key = MulticastRouteKey::new( Some(s_ip), group, - DEFAULT_MULTICAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ) .expect("AF match"); let route = MulticastRoute::new( diff --git a/rdb/src/types.rs b/rdb/src/types.rs index 3a68b562..c7255935 100644 --- a/rdb/src/types.rs +++ b/rdb/src/types.rs @@ -794,9 +794,6 @@ impl Display for PrefixChangeNotification { // MRIB (Multicast RIB) Types // ============================================================================ -/// Default VNI for fleet-wide multicast routing. -pub const DEFAULT_MULTICAST_VNI: Vni = Vni::DEFAULT_MULTICAST_VNI; - /// A validated IPv4 unicast address suitable for multicast source fields. /// /// This rejects addresses that cannot appear as a forwarded unicast source: @@ -1112,92 +1109,7 @@ impl From for Ipv6Addr { } } -/// A validated underlay multicast IPv6 address within ff04::/64. -/// -/// The Oxide rack maps overlay multicast groups 1:1 to admin-local scoped -/// IPv6 multicast addresses in `UNDERLAY_MULTICAST_SUBNET` (ff04::/64). -/// This type enforces that invariant at construction time. -/// -// TODO: This duplicates `dpd_types::mcast::UnderlayMulticastIpv6` in dendrite. -// Both should be consolidated into `omicron_common` so maghemite, dendrite, -// and omicron share a single definition. -#[derive( - Debug, - Copy, - Clone, - Eq, - PartialEq, - PartialOrd, - Ord, - Hash, - Serialize, - Deserialize, - JsonSchema, -)] -#[serde(try_from = "Ipv6Addr", into = "Ipv6Addr")] -#[schemars(transparent)] -pub struct UnderlayMulticastIpv6(Ipv6Addr); - -impl UnderlayMulticastIpv6 { - /// Create a new validated underlay multicast address. - /// - /// # Errors - /// - /// Returns an error if the address is not within `UNDERLAY_MULTICAST_SUBNET` - /// (ff04::/64). - pub fn new(value: Ipv6Addr) -> Result { - if !UNDERLAY_MULTICAST_SUBNET.contains(value) { - return Err(Error::Validation(format!( - "underlay address {value} is not within \ - {UNDERLAY_MULTICAST_SUBNET}" - ))); - } - Ok(Self(value)) - } - - /// Returns the underlying IPv6 address. - #[inline] - pub const fn ip(&self) -> Ipv6Addr { - self.0 - } -} - -impl fmt::Display for UnderlayMulticastIpv6 { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl TryFrom for UnderlayMulticastIpv6 { - type Error = Error; - - fn try_from(value: Ipv6Addr) -> Result { - Self::new(value) - } -} - -impl From for Ipv6Addr { - fn from(addr: UnderlayMulticastIpv6) -> Self { - addr.0 - } -} - -impl From for IpAddr { - fn from(addr: UnderlayMulticastIpv6) -> Self { - IpAddr::V6(addr.0) - } -} - -impl FromStr for UnderlayMulticastIpv6 { - type Err = Error; - - fn from_str(s: &str) -> Result { - let addr: Ipv6Addr = s.parse().map_err(|_| { - Error::Validation(format!("invalid IPv6 address: {s}")) - })?; - Self::new(addr) - } -} +pub use mg_common::net::UnderlayMulticastIpv6; /// A validated multicast group address (IPv4 or IPv6). /// @@ -1694,6 +1606,18 @@ impl MulticastRoute { } } +impl From<&MulticastRoute> for mg_common::net::MulticastOrigin { + fn from(route: &MulticastRoute) -> Self { + Self { + overlay_group: route.key.group().ip(), + underlay_group: route.underlay_group, + vni: route.key.vni(), + metric: 0, + source: route.key.source(), + } + } +} + /// Source of a multicast route entry. #[derive( Debug, Copy, Clone, Serialize, Deserialize, JsonSchema, Eq, PartialEq, @@ -2034,7 +1958,7 @@ mod test { let result = MulticastRouteKey::new( Some(IpAddr::V4(src.ip())), group.into(), - DEFAULT_MULTICAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ); assert!( result.is_err(), @@ -2049,7 +1973,7 @@ mod test { let result = MulticastRouteKey::new( Some(IpAddr::V6(src)), group.into(), - DEFAULT_MULTICAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ); assert!( result.is_err(),