From 4677b46e4179d44100ce3c46d7ce27f21021af6c Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 1 Apr 2026 08:13:40 +0000 Subject: [PATCH 1/4] [multicast] DDM multicast exchange: V4 protocol, MRIB sync, M2P hooks Adds multicast group subscription distribution to the DDM exchange protocol with a V4 version bump (frozen V3 types for wire compat). Key changes: - V4 exchange protocol with multicast support (V3 peers are unaffected) - UnderlayMulticastIpv6 validated newtype moved to mg-common (ff04::/64) (moved from rdb types) - MRIB->DDM sync in mg-lower/mrib.rs - OPTE M2P hooks for learned multicast routes (requires OPTE #924) - Atomic update_imported_mcast on Db (single lock for import/delete/diff, which is a bit different from the tunnel work) - Collapsed send_update dispatch - Shared pull handler helpers (collect_underlay_tunnel, collect_multicast) - MulticastPathHop constructor - Some serde round-trip and validation tests, including for version handling Stacked on zl/mrib (MRIB: Multicast RIB implementation [#675](https://github.com/oxidecomputer/maghemite/pull/675)). --- Cargo.lock | 2 + ddm-admin-client/src/lib.rs | 22 + ddm-api/src/lib.rs | 41 +- ddm-types/versions/src/latest.rs | 3 + ddm-types/versions/src/lib.rs | 2 + .../versions/src/multicast_support/db.rs | 62 ++ .../src/multicast_support/exchange.rs | 76 +++ .../versions/src/multicast_support/mod.rs | 9 + ddm/src/admin.rs | 69 ++- ddm/src/db.rs | 137 ++++- ddm/src/discovery.rs | 9 +- ddm/src/exchange.rs | 579 +++++++++++++++--- ddm/src/sm.rs | 209 ++++++- ddm/src/sys.rs | 95 ++- ddmadm/src/main.rs | 123 ++++ mg-common/Cargo.toml | 4 + mg-common/src/net.rs | 222 ++++++- mg-lower/src/ddm.rs | 56 +- mg-lower/src/lib.rs | 1 + mg-lower/src/mrib.rs | 234 +++++++ mg-lower/src/platform.rs | 95 +++ .../ddm-admin-1.0.0-b6eac7.json.gitstub | 1 + ...6eac7.json => ddm-admin-2.0.0-3dc476.json} | 216 ++++++- openapi/ddm-admin/ddm-admin-latest.json | 2 +- 24 files changed, 2167 insertions(+), 102 deletions(-) create mode 100644 ddm-types/versions/src/multicast_support/db.rs create mode 100644 ddm-types/versions/src/multicast_support/exchange.rs create mode 100644 ddm-types/versions/src/multicast_support/mod.rs create mode 100644 mg-lower/src/mrib.rs create mode 100644 openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub rename openapi/ddm-admin/{ddm-admin-1.0.0-b6eac7.json => ddm-admin-2.0.0-3dc476.json} (64%) diff --git a/Cargo.lock b/Cargo.lock index e3f50d30..f8c9e9e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3792,11 +3792,13 @@ dependencies = [ "clap", "libc", "libnet", + "omicron-common", "oximeter", "oximeter-producer", "oxnet", "schemars 0.8.22", "serde", + "serde_json", "slog", "slog-async", "slog-bunyan", diff --git a/ddm-admin-client/src/lib.rs b/ddm-admin-client/src/lib.rs index ea45cea2..0d22065f 100644 --- a/ddm-admin-client/src/lib.rs +++ b/ddm-admin-client/src/lib.rs @@ -39,3 +39,25 @@ impl std::hash::Hash for types::TunnelOrigin { self.metric.hash(state); } } + +impl std::cmp::PartialEq for types::MulticastOrigin { + fn eq(&self, other: &Self) -> bool { + self.overlay_group.eq(&other.overlay_group) + && self.underlay_group.eq(&other.underlay_group) + && self.vni.eq(&other.vni) + && self.source.eq(&other.source) + } +} + +impl std::cmp::Eq for types::MulticastOrigin {} + +/// Metric is excluded from identity so that metric changes update +/// an existing entry rather than creating a duplicate. +impl std::hash::Hash for types::MulticastOrigin { + fn hash(&self, state: &mut H) { + self.overlay_group.hash(state); + self.underlay_group.hash(state); + self.vni.hash(state); + self.source.hash(state); + } +} diff --git a/ddm-api/src/lib.rs b/ddm-api/src/lib.rs index 546797dc..905942b5 100644 --- a/ddm-api/src/lib.rs +++ b/ddm-api/src/lib.rs @@ -10,7 +10,7 @@ use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; use dropshot_api_manager_types::api_versions; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use std::collections::{HashMap, HashSet}; @@ -26,6 +26,7 @@ api_versions!([ // | example for the next person. // v // (next_int, IDENT), + (2, MULTICAST_SUPPORT), (1, INITIAL), ]); @@ -100,6 +101,44 @@ pub trait DdmAdminApi { request: TypedBody>, ) -> Result; + #[endpoint { + method = GET, + path = "/originated_multicast_groups", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn get_originated_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError>; + + #[endpoint { + method = GET, + path = "/multicast_groups", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn get_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError>; + + #[endpoint { + method = PUT, + path = "/multicast_group", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn advertise_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result; + + #[endpoint { + method = DELETE, + path = "/multicast_group", + versions = VERSION_MULTICAST_SUPPORT.. + }] + async fn withdraw_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result; + #[endpoint { method = PUT, path = "/sync" }] async fn sync( ctx: RequestContext, diff --git a/ddm-types/versions/src/latest.rs b/ddm-types/versions/src/latest.rs index 08057601..c2d29e3e 100644 --- a/ddm-types/versions/src/latest.rs +++ b/ddm-types/versions/src/latest.rs @@ -15,9 +15,12 @@ pub mod db { pub use crate::v1::db::PeerStatus; pub use crate::v1::db::RouterKind; pub use crate::v1::db::TunnelRoute; + pub use crate::v2::db::MulticastRoute; } pub mod exchange { pub use crate::v1::exchange::PathVector; pub use crate::v1::exchange::PathVectorV2; + pub use crate::v2::exchange::MulticastPathHop; + pub use crate::v2::exchange::MulticastPathVector; } diff --git a/ddm-types/versions/src/lib.rs b/ddm-types/versions/src/lib.rs index 9f8dcdc7..f723aaf0 100644 --- a/ddm-types/versions/src/lib.rs +++ b/ddm-types/versions/src/lib.rs @@ -32,3 +32,5 @@ pub mod latest; #[path = "initial/mod.rs"] pub mod v1; +#[path = "multicast_support/mod.rs"] +pub mod v2; diff --git a/ddm-types/versions/src/multicast_support/db.rs b/ddm-types/versions/src/multicast_support/db.rs new file mode 100644 index 00000000..f6cdf4d0 --- /dev/null +++ b/ddm-types/versions/src/multicast_support/db.rs @@ -0,0 +1,62 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::net::Ipv6Addr; + +use mg_common::net::MulticastOrigin; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::v2::exchange::MulticastPathHop; + +/// A multicast route learned via DDM. +/// +/// Carries both the group origin and the path vector from the +/// originating subscriber through intermediate transit routers. +/// The path enables loop detection and (in multi-rack topologies) +/// replication optimizations per [RFD 488] in the future. +/// +/// Equality and hashing consider only `origin` and `nexthop` so that +/// a route update with a longer path replaces the existing entry in +/// hash-based collections. +/// +/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488 +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct MulticastRoute { + /// The multicast group origin information. + pub origin: MulticastOrigin, + + /// Underlay nexthop address (DDM peer that advertised this route). + /// Used to associate the route with a peer for expiration. + pub nexthop: Ipv6Addr, + + /// Path vector from the originating subscriber outward. + /// Each hop records the router that redistributed this + /// subscription announcement. Used for loop detection on pull + /// and for future replication optimization in multi-rack + /// topologies. + #[serde(default)] + pub path: Vec, +} + +impl PartialEq for MulticastRoute { + fn eq(&self, other: &Self) -> bool { + self.origin == other.origin && self.nexthop == other.nexthop + } +} + +impl Eq for MulticastRoute {} + +impl std::hash::Hash for MulticastRoute { + fn hash(&self, state: &mut H) { + self.origin.hash(state); + self.nexthop.hash(state); + } +} + +impl From for MulticastOrigin { + fn from(x: MulticastRoute) -> Self { + x.origin + } +} diff --git a/ddm-types/versions/src/multicast_support/exchange.rs b/ddm-types/versions/src/multicast_support/exchange.rs new file mode 100644 index 00000000..4aa528ed --- /dev/null +++ b/ddm-types/versions/src/multicast_support/exchange.rs @@ -0,0 +1,76 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::net::Ipv6Addr; + +/// A single hop in the multicast path, carrying metadata needed for +/// replication optimization. +/// +/// Unlike unicast paths which only need hostnames, multicast hops carry +/// additional information for computing optimal replication points per +/// [RFD 488]. +/// +/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488 +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, +)] +pub struct MulticastPathHop { + /// Router identifier (hostname). + pub router_id: String, + + /// The underlay address of this router (for replication targeting). + pub underlay_addr: Ipv6Addr, + + /// Number of downstream subscribers reachable via this hop. + /// Used for load-aware replication decisions in multi-rack + /// topologies. + #[serde(default)] + pub downstream_subscriber_count: u32, +} + +impl MulticastPathHop { + /// Create a hop with the given router identity and a zero subscriber + /// count. The count will be populated once transit routers track + /// downstream subscriber counts for load-aware replication (RFD 488). + pub fn new(router_id: String, underlay_addr: Ipv6Addr) -> Self { + Self { + router_id, + underlay_addr, + downstream_subscriber_count: 0, + } + } +} + +/// Multicast group subscription announcement propagating through DDM. +/// +/// The path records the sequence of routers from the original subscriber +/// toward the current receiving router. Currently, this is used for loop +/// detection: if our router_id appears in the path, the announcement has +/// already traversed us and is dropped. The path structure also carries +/// topology information for future replication optimizations (RFD 488). +#[derive( + Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, +)] +pub struct MulticastPathVector { + /// The multicast group origin information. + pub origin: mg_common::net::MulticastOrigin, + + /// The path from the original subscriber to the current router. + /// Ordered from subscriber outward (subscriber router first). + pub path: Vec, +} + +impl MulticastPathVector { + /// Append a hop to this path vector. + pub fn with_hop(&self, hop: MulticastPathHop) -> Self { + let mut path = self.path.clone(); + path.push(hop); + Self { + origin: self.origin.clone(), + path, + } + } +} diff --git a/ddm-types/versions/src/multicast_support/mod.rs b/ddm-types/versions/src/multicast_support/mod.rs new file mode 100644 index 00000000..066113e2 --- /dev/null +++ b/ddm-types/versions/src/multicast_support/mod.rs @@ -0,0 +1,9 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Types from API version 2 (MULTICAST_SUPPORT) that add multicast +//! group management to the DDM admin API. + +pub mod db; +pub mod exchange; diff --git a/ddm/src/admin.rs b/ddm/src/admin.rs index 6d49a368..51a50677 100644 --- a/ddm/src/admin.rs +++ b/ddm/src/admin.rs @@ -7,7 +7,7 @@ use crate::sm::{AdminEvent, Event, PrefixSet, SmContext}; use ddm_api::DdmAdminApi; use ddm_api::ddm_admin_api_mod; use ddm_types::admin::{EnableStatsRequest, ExpirePathParams, PrefixMap}; -use ddm_types::db::{PeerInfo, TunnelRoute}; +use ddm_types::db::{MulticastRoute, PeerInfo, TunnelRoute}; use ddm_types::exchange::PathVector; use dropshot::ApiDescription; use dropshot::ApiDescriptionBuildErrors; @@ -21,7 +21,7 @@ use dropshot::Path; use dropshot::RequestContext; use dropshot::TypedBody; use mg_common::lock; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use slog::{Logger, error, info}; use std::collections::{HashMap, HashSet}; @@ -333,6 +333,71 @@ impl DdmAdminApi for DdmAdminApiImpl { Ok(HttpResponseUpdatedNoContent()) } + async fn get_originated_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError> { + let ctx = lock!(ctx.context()); + let originated = ctx + .db + .originated_mcast() + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + Ok(HttpResponseOk(originated)) + } + + async fn get_multicast_groups( + ctx: RequestContext, + ) -> Result>, HttpError> { + let ctx = lock!(ctx.context()); + let imported = ctx.db.imported_mcast(); + Ok(HttpResponseOk(imported)) + } + + async fn advertise_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result { + let ctx = lock!(ctx.context()); + let groups = request.into_inner(); + slog::info!(ctx.log, "advertise multicast groups: {groups:#?}"); + ctx.db + .originate_mcast(&groups) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + + for e in &ctx.event_channels { + e.send(Event::Admin(AdminEvent::Announce(PrefixSet::Multicast( + groups.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; + } + + Ok(HttpResponseUpdatedNoContent()) + } + + async fn withdraw_multicast_groups( + ctx: RequestContext, + request: TypedBody>, + ) -> Result { + let ctx = lock!(ctx.context()); + let groups = request.into_inner(); + slog::info!(ctx.log, "withdraw multicast groups: {groups:#?}"); + ctx.db + .withdraw_mcast(&groups) + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + + for e in &ctx.event_channels { + e.send(Event::Admin(AdminEvent::Withdraw(PrefixSet::Multicast( + groups.clone(), + )))) + .map_err(|e| { + HttpError::for_internal_error(format!("admin event send: {e}")) + })?; + } + + Ok(HttpResponseUpdatedNoContent()) + } + async fn sync( ctx: RequestContext, ) -> Result { diff --git a/ddm/src/db.rs b/ddm/src/db.rs index 13338cc4..30eec234 100644 --- a/ddm/src/db.rs +++ b/ddm/src/db.rs @@ -2,9 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use ddm_types::db::{PeerInfo, TunnelRoute}; +use ddm_types::db::{MulticastRoute, PeerInfo, TunnelRoute}; use mg_common::lock; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::{IpNet, Ipv6Net}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -21,6 +21,10 @@ const ORIGINATE: &str = "originate"; /// tunnel endpoints. const TUNNEL_ORIGINATE: &str = "tunnel_originate"; +/// The handle used to open a persistent key-value tree for originated +/// multicast groups. +const MCAST_ORIGINATE: &str = "mcast_originate"; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("datastore error {0}")] @@ -48,6 +52,7 @@ pub struct DbData { pub peers: HashMap, pub imported: HashSet, pub imported_tunnel: HashSet, + pub imported_mcast: HashSet, } unsafe impl Sync for Db {} @@ -85,6 +90,14 @@ impl Db { lock!(self.data).imported_tunnel.len() } + pub fn imported_mcast(&self) -> HashSet { + lock!(self.data).imported_mcast.clone() + } + + pub fn imported_mcast_count(&self) -> usize { + lock!(self.data).imported_mcast.len() + } + pub fn import(&self, r: &HashSet) { lock!(self.data).imported.extend(r.clone()); } @@ -93,6 +106,10 @@ impl Db { lock!(self.data).imported_tunnel.extend(r.clone()); } + pub fn import_mcast(&self, r: &HashSet) { + lock!(self.data).imported_mcast.extend(r.clone()); + } + pub fn delete_import(&self, r: &HashSet) { let imported = &mut lock!(self.data).imported; for x in r { @@ -107,6 +124,38 @@ impl Db { } } + pub fn delete_import_mcast(&self, r: &HashSet) { + let imported = &mut lock!(self.data).imported_mcast; + for x in r { + imported.remove(x); + } + } + + /// Atomically import and delete multicast routes under a single lock, + /// returning the effective difference (additions + removals) against the + /// state before mutation. + /// + /// This avoids a TOCTOU race where concurrent mutations between separate + /// lock acquisitions could produce an incorrect view difference. + pub fn update_imported_mcast( + &self, + import: &HashSet, + remove: &HashSet, + ) -> (HashSet, HashSet) { + let mut data = lock!(self.data); + + let before = data.imported_mcast.clone(); + data.imported_mcast.extend(import.iter().cloned()); + + for x in remove { + data.imported_mcast.remove(x); + } + + let to_add = data.imported_mcast.difference(&before).cloned().collect(); + let to_del = before.difference(&data.imported_mcast).cloned().collect(); + (to_add, to_del) + } + pub fn originate(&self, prefixes: &HashSet) -> Result<(), Error> { let tree = self.persistent_data.open_tree(ORIGINATE)?; for p in prefixes { @@ -129,6 +178,19 @@ impl Db { Ok(()) } + pub fn originate_mcast( + &self, + origins: &HashSet, + ) -> Result<(), Error> { + let tree = self.persistent_data.open_tree(MCAST_ORIGINATE)?; + for o in origins { + let entry = serde_json::to_string(o)?; + tree.insert(entry.as_str(), "")?; + } + tree.flush()?; + Ok(()) + } + pub fn originated(&self) -> Result, Error> { let tree = self.persistent_data.open_tree(ORIGINATE)?; let result = tree @@ -178,6 +240,7 @@ impl Db { return None; } }; + let value = String::from_utf8_lossy(&key); let value: TunnelOrigin = match serde_json::from_str(&value) { Ok(item) => item, @@ -199,6 +262,44 @@ impl Db { Ok(self.originated_tunnel()?.len()) } + pub fn originated_mcast(&self) -> Result, Error> { + let tree = self.persistent_data.open_tree(MCAST_ORIGINATE)?; + let result = tree + .scan_prefix(vec![]) + .filter_map(|item| { + let (key, _value) = match item { + Ok(item) => item, + Err(e) => { + error!( + self.log, + "db: error fetching ddm mcast origin entry: {e}" + ); + return None; + } + }; + + let value = String::from_utf8_lossy(&key); + let value: MulticastOrigin = match serde_json::from_str(&value) + { + Ok(item) => item, + Err(e) => { + error!( + self.log, + "db: error parsing ddm mcast origin: {e}" + ); + return None; + } + }; + Some(value) + }) + .collect(); + Ok(result) + } + + pub fn originated_mcast_count(&self) -> Result { + Ok(self.originated_mcast()?.len()) + } + pub fn withdraw(&self, prefixes: &HashSet) -> Result<(), Error> { let tree = self.persistent_data.open_tree(ORIGINATE)?; for p in prefixes { @@ -221,6 +322,19 @@ impl Db { Ok(()) } + pub fn withdraw_mcast( + &self, + origins: &HashSet, + ) -> Result<(), Error> { + let tree = self.persistent_data.open_tree(MCAST_ORIGINATE)?; + for o in origins { + let entry = serde_json::to_string(o)?; + tree.remove(entry.as_str())?; + } + tree.flush()?; + Ok(()) + } + /// Set peer info at the given index. Returns true if peer information was /// changed. pub fn set_peer(&self, index: u32, info: PeerInfo) -> bool { @@ -233,7 +347,11 @@ impl Db { pub fn remove_nexthop_routes( &self, nexthop: Ipv6Addr, - ) -> (HashSet, HashSet) { + ) -> ( + HashSet, + HashSet, + HashSet, + ) { let mut data = lock!(self.data); // Routes are generally held in sets to prevent duplication and provide // handy set-algebra operations. @@ -256,7 +374,18 @@ impl Db { for x in &tnl_removed { data.imported_tunnel.remove(x); } - (removed, tnl_removed) + + let mut mcast_removed = HashSet::new(); + for x in &data.imported_mcast { + if x.nexthop == nexthop { + mcast_removed.insert(x.clone()); + } + } + for x in &mcast_removed { + data.imported_mcast.remove(x); + } + + (removed, tnl_removed, mcast_removed) } pub fn remove_peer(&self, index: u32) { diff --git a/ddm/src/discovery.rs b/ddm/src/discovery.rs index fc4a84e8..dd6da934 100644 --- a/ddm/src/discovery.rs +++ b/ddm/src/discovery.rs @@ -113,6 +113,7 @@ const ADVERTISE: u8 = 1 << 1; pub enum Version { V2 = 2, V3 = 3, + V4 = 4, } #[derive(Error, Debug)] @@ -136,7 +137,7 @@ pub struct DiscoveryPacket { impl DiscoveryPacket { pub fn new_solicitation(hostname: String, kind: RouterKind) -> Self { Self { - version: Version::V2 as u8, + version: Version::V4 as u8, flags: SOLICIT, hostname, kind, @@ -144,7 +145,7 @@ impl DiscoveryPacket { } pub fn new_advertisement(hostname: String, kind: RouterKind) -> Self { Self { - version: Version::V2 as u8, + version: Version::V4 as u8, flags: ADVERTISE, hostname, kind, @@ -461,12 +462,12 @@ fn handle_advertisement( let version = match version { 2 => Version::V2, 3 => Version::V3, + 4 => Version::V4, x => { err!( ctx.log, ctx.config.if_name, - "unknown protocol version {}, known versions are: 1, 2", - x + "unknown protocol version {x}, known versions are: 2, 3, 4" ); return; } diff --git a/ddm/src/exchange.rs b/ddm/src/exchange.rs index 2c1cc876..57204cc8 100644 --- a/ddm/src/exchange.rs +++ b/ddm/src/exchange.rs @@ -19,8 +19,10 @@ use crate::db::{Route, effective_route_set}; use crate::discovery::Version; use crate::sm::{Config, Event, PeerEvent, SmContext}; use crate::{dbg, err, inf, wrn}; -use ddm_types::db::{RouterKind, TunnelRoute}; -use ddm_types::exchange::{PathVector, PathVectorV2}; +use ddm_types::db::{MulticastRoute, RouterKind, TunnelRoute}; +use ddm_types::exchange::{ + MulticastPathHop, MulticastPathVector, PathVector, PathVectorV2, +}; use dropshot::ApiDescription; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; @@ -74,10 +76,20 @@ pub struct UpdateV2 { pub tunnel: Option, } +/// THIS TYPE IS FOR DDM PROTOCOL VERSION 3. IT SHALL NEVER CHANGE. THIS TYPE +/// CAN BE REMOVED WHEN DDMV3 CLIENTS AND SERVERS NO LONGER EXIST BUT ITS +/// DEFINITION SHALL NEVER CHANGE. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct UpdateV3 { + pub underlay: Option, + pub tunnel: Option, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] pub struct Update { pub underlay: Option, pub tunnel: Option, + pub multicast: Option, } impl From for Update { @@ -88,6 +100,7 @@ impl From for Update { announce: value.announce, withdraw: value.withdraw, }), + multicast: None, } } } @@ -97,6 +110,8 @@ impl From for Update { Update { tunnel: value.tunnel.map(TunnelUpdate::from), underlay: value.underlay.map(UnderlayUpdate::from), + // V2 protocol doesn't support multicast + multicast: None, } } } @@ -120,11 +135,31 @@ impl From for UpdateV2 { } } +impl From for Update { + fn from(value: UpdateV3) -> Self { + Update { + underlay: value.underlay, + tunnel: value.tunnel, + multicast: None, + } + } +} + +impl From for UpdateV3 { + fn from(value: Update) -> Self { + UpdateV3 { + underlay: value.underlay, + tunnel: value.tunnel, + } + } +} + impl From for Update { fn from(u: UnderlayUpdate) -> Self { Update { underlay: Some(u), tunnel: None, + multicast: None, } } } @@ -134,6 +169,7 @@ impl From for Update { Update { underlay: None, tunnel: Some(t), + multicast: None, } } } @@ -143,14 +179,35 @@ impl Update { Self { underlay: pr.underlay.map(UnderlayUpdate::announce), tunnel: pr.tunnel.map(TunnelUpdate::announce), + multicast: pr.multicast.map(MulticastUpdate::announce), } } } +impl From for Update { + fn from(m: MulticastUpdate) -> Self { + Update { + underlay: None, + tunnel: None, + multicast: Some(m), + } + } +} + +/// THIS TYPE IS FOR DDM PROTOCOL VERSION 3. IT SHALL NEVER CHANGE. THIS TYPE +/// CAN BE REMOVED WHEN DDMV3 CLIENTS AND SERVERS NO LONGER EXIST BUT ITS +/// DEFINITION SHALL NEVER CHANGE. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct PullResponseV3 { + pub underlay: Option>, + pub tunnel: Option>, +} + #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] pub struct PullResponse { pub underlay: Option>, pub tunnel: Option>, + pub multicast: Option>, } /// THIS TYPE IS FOR DDM PROTOCOL VERSION 2. IT SHALL NEVER CHANGE. THIS TYPE @@ -171,6 +228,18 @@ impl From for PullResponse { tunnel: value .tunnel .map(|x| x.into_iter().map(TunnelOrigin::from).collect()), + // V2 protocol doesn't support multicast + multicast: None, + } + } +} + +impl From for PullResponse { + fn from(value: PullResponseV3) -> Self { + PullResponse { + underlay: value.underlay, + tunnel: value.tunnel, + multicast: None, } } } @@ -180,6 +249,7 @@ impl From> for PullResponse { PullResponse { underlay: Some(value), tunnel: None, + multicast: None, } } } @@ -334,6 +404,47 @@ impl TunnelUpdate { } } +/// Multicast group subscription updates. +/// +/// Carries path-vector information for multicast group subscriptions, +/// enabling loop detection and optimal replication point computation. +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] +pub struct MulticastUpdate { + pub announce: HashSet, + pub withdraw: HashSet, +} + +impl MulticastUpdate { + pub fn announce(groups: HashSet) -> Self { + Self { + announce: groups, + ..Default::default() + } + } + pub fn withdraw(groups: HashSet) -> Self { + Self { + withdraw: groups, + ..Default::default() + } + } + + /// Add a hop to all path vectors in this update. + pub fn with_hop(&self, hop: MulticastPathHop) -> Self { + Self { + announce: self + .announce + .iter() + .map(|pv| pv.with_hop(hop.clone())) + .collect(), + withdraw: self + .withdraw + .iter() + .map(|pv| pv.with_hop(hop.clone())) + .collect(), + } + } +} + #[derive(Error, Debug)] pub enum ExchangeError { #[error("io error: {0}")] @@ -404,15 +515,52 @@ pub(crate) fn withdraw_tunnel( send_update(ctx, config, update.into(), addr, version, rt, log) } -pub(crate) fn do_pull( +pub(crate) fn announce_multicast( + ctx: &SmContext, + config: Config, + groups: HashSet, + addr: Ipv6Addr, + version: Version, + rt: Arc, + log: Logger, +) -> Result<(), ExchangeError> { + let update = MulticastUpdate::announce(groups); + send_update(ctx, config, update.into(), addr, version, rt, log) +} + +pub(crate) fn withdraw_multicast( + ctx: &SmContext, + config: Config, + groups: HashSet, + addr: Ipv6Addr, + version: Version, + rt: Arc, + log: Logger, +) -> Result<(), ExchangeError> { + let update = MulticastUpdate::withdraw(groups); + send_update(ctx, config, update.into(), addr, version, rt, log) +} + +pub(crate) fn do_pull_v4( ctx: &SmContext, addr: &Ipv6Addr, rt: &Arc, ) -> Result { - let uri = format!( - "http://[{}%{}]:{}/v3/pull", - addr, ctx.config.if_index, ctx.config.exchange_port, - ); + let if_index = ctx.config.if_index; + let port = ctx.config.exchange_port; + let uri = format!("http://[{addr}%{if_index}]:{port}/v4/pull"); + let body = do_pull_common(uri, rt)?; + Ok(serde_json::from_slice(&body)?) +} + +pub(crate) fn do_pull_v3( + ctx: &SmContext, + addr: &Ipv6Addr, + rt: &Arc, +) -> Result { + let if_index = ctx.config.if_index; + let port = ctx.config.exchange_port; + let uri = format!("http://[{addr}%{if_index}]:{port}/v3/pull"); let body = do_pull_common(uri, rt)?; Ok(serde_json::from_slice(&body)?) } @@ -464,7 +612,8 @@ pub(crate) fn pull( ) -> Result<(), ExchangeError> { let pr: PullResponse = match version { Version::V2 => do_pull_v2(&ctx, &addr, &rt)?.into(), - Version::V3 => do_pull(&ctx, &addr, &rt)?, + Version::V3 => do_pull_v3(&ctx, &addr, &rt)?.into(), + Version::V4 => do_pull_v4(&ctx, &addr, &rt)?, }; let update = Update::announce(pr); @@ -489,43 +638,14 @@ fn send_update( log: Logger, ) -> Result<(), ExchangeError> { ctx.stats.updates_sent.fetch_add(1, Ordering::Relaxed); - match version { - Version::V2 => { - send_update_v2(ctx, config, update.into(), addr, rt, log) - } - Version::V3 => send_update_v3(ctx, config, update, addr, rt, log), - } -} - -fn send_update_v2( - ctx: &SmContext, - config: Config, - update: UpdateV2, - addr: Ipv6Addr, - rt: Arc, - log: Logger, -) -> Result<(), ExchangeError> { - let payload = serde_json::to_string(&update)?; - let uri = format!( - "http://[{}%{}]:{}/v2/push", - addr, config.if_index, config.exchange_port, - ); - send_update_common(ctx, uri, payload, config, rt, log) -} - -fn send_update_v3( - ctx: &SmContext, - config: Config, - update: Update, - addr: Ipv6Addr, - rt: Arc, - log: Logger, -) -> Result<(), ExchangeError> { - let payload = serde_json::to_string(&update)?; - let uri = format!( - "http://[{}%{}]:{}/v3/push", - addr, config.if_index, config.exchange_port, - ); + let (payload, path) = match version { + Version::V2 => (serde_json::to_string(&UpdateV2::from(update))?, "v2"), + Version::V3 => (serde_json::to_string(&UpdateV3::from(update))?, "v3"), + Version::V4 => (serde_json::to_string(&update)?, "v4"), + }; + let if_index = config.if_index; + let port = config.exchange_port; + let uri = format!("http://[{addr}%{if_index}]:{port}/{path}/push"); send_update_common(ctx, uri, payload, config, rt, log) } @@ -630,9 +750,11 @@ pub fn api_description() -> Result< > { let mut api = ApiDescription::new(); api.register(push_handler_v2)?; - api.register(push_handler)?; + api.register(push_handler_v3)?; + api.register(push_handler_v4)?; api.register(pull_handler_v2)?; - api.register(pull_handler)?; + api.register(pull_handler_v3)?; + api.register(pull_handler_v4)?; Ok(api) } @@ -647,7 +769,16 @@ async fn push_handler_v2( } #[endpoint { method = PUT, path = "/v3/push" }] -async fn push_handler( +async fn push_handler_v3( + ctx: RequestContext>>, + request: TypedBody, +) -> Result { + let update = Update::from(request.into_inner()); + push_handler_common(ctx, update).await +} + +#[endpoint { method = PUT, path = "/v4/push" }] +async fn push_handler_v4( ctx: RequestContext>>, request: TypedBody, ) -> Result { @@ -744,19 +875,15 @@ async fn pull_handler_v2( })) } -#[endpoint { method = GET, path = "/v3/pull" }] -async fn pull_handler( - ctx: RequestContext>>, -) -> Result, HttpError> { - let ctx = ctx.context().lock().await.clone(); - +/// Collect underlay and tunnel routes for pull responses (shared by V3/V4). +fn collect_underlay_tunnel( + ctx: &HandlerContext, +) -> Result<(HashSet, HashSet), HttpError> { let mut underlay = HashSet::new(); let mut tunnel = HashSet::new(); - // Only transit routers redistribute prefixes if ctx.ctx.config.kind == RouterKind::Transit { for route in &ctx.ctx.db.imported() { - // don't redistribute prefixes to their originators if route.nexthop == ctx.peer { continue; } @@ -771,21 +898,20 @@ async fn pull_handler( if route.nexthop == ctx.peer { continue; } - let tv = route.origin; - tunnel.insert(tv); + tunnel.insert(route.origin); } } + let originated = ctx .ctx .db .originated() .map_err(|e| HttpError::for_internal_error(e.to_string()))?; for prefix in &originated { - let pv = PathVector { + underlay.insert(PathVector { destination: *prefix, path: vec![ctx.ctx.hostname.clone()], - }; - underlay.insert(pv); + }); } let originated_tunnel = ctx @@ -794,26 +920,87 @@ async fn pull_handler( .originated_tunnel() .map_err(|e| HttpError::for_internal_error(e.to_string()))?; for prefix in &originated_tunnel { - let tv = TunnelOrigin { + tunnel.insert(TunnelOrigin { overlay_prefix: prefix.overlay_prefix, boundary_addr: prefix.boundary_addr, vni: prefix.vni, metric: prefix.metric, - }; - tunnel.insert(tv); + }); } + Ok((underlay, tunnel)) +} + +/// Collect multicast routes for V4 pull responses. +fn collect_multicast( + ctx: &HandlerContext, +) -> Result, HttpError> { + let mut multicast = HashSet::new(); + + if ctx.ctx.config.kind == RouterKind::Transit { + for route in &ctx.ctx.db.imported_mcast() { + if route.nexthop == ctx.peer { + continue; + } + let hop = MulticastPathHop::new( + ctx.ctx.hostname.clone(), + ctx.ctx.config.addr, + ); + let mut path = route.path.clone(); + path.push(hop); + multicast.insert(MulticastPathVector { + origin: route.origin.clone(), + path, + }); + } + } + + let originated_mcast = ctx + .ctx + .db + .originated_mcast() + .map_err(|e| HttpError::for_internal_error(e.to_string()))?; + for origin in &originated_mcast { + let hop = MulticastPathHop::new( + ctx.ctx.hostname.clone(), + ctx.ctx.config.addr, + ); + multicast.insert(MulticastPathVector { + origin: origin.clone(), + path: vec![hop], + }); + } + + Ok(multicast) +} + +fn opt(s: HashSet) -> Option> { + if s.is_empty() { None } else { Some(s) } +} + +#[endpoint { method = GET, path = "/v3/pull" }] +async fn pull_handler_v3( + ctx: RequestContext>>, +) -> Result, HttpError> { + let ctx = ctx.context().lock().await.clone(); + let (underlay, tunnel) = collect_underlay_tunnel(&ctx)?; + Ok(HttpResponseOk(PullResponseV3 { + underlay: opt(underlay), + tunnel: opt(tunnel), + })) +} + +#[endpoint { method = GET, path = "/v4/pull" }] +async fn pull_handler_v4( + ctx: RequestContext>>, +) -> Result, HttpError> { + let ctx = ctx.context().lock().await.clone(); + let (underlay, tunnel) = collect_underlay_tunnel(&ctx)?; + let multicast = collect_multicast(&ctx)?; Ok(HttpResponseOk(PullResponse { - underlay: if underlay.is_empty() { - None - } else { - Some(underlay) - }, - tunnel: if tunnel.is_empty() { - None - } else { - Some(tunnel) - }, + underlay: opt(underlay), + tunnel: opt(tunnel), + multicast: opt(multicast), })) } @@ -831,6 +1018,10 @@ fn handle_update(update: &Update, ctx: &HandlerContext) { handle_tunnel_update(tunnel_update, ctx); } + if let Some(multicast_update) = &update.multicast { + handle_multicast_update(multicast_update, ctx); + } + // distribute updates if ctx.ctx.config.kind == RouterKind::Transit { @@ -846,13 +1037,24 @@ fn handle_update(update: &Update, ctx: &HandlerContext) { .as_ref() .map(|update| update.with_path_element(ctx.ctx.hostname.clone())); - let push = Update { + // Add our hop info to multicast path vectors before redistribution + let multicast = update.multicast.as_ref().map(|update| { + let hop = MulticastPathHop::new( + ctx.ctx.hostname.clone(), + ctx.ctx.config.addr, + ); + update.with_hop(hop) + }); + + let push = Arc::new(Update { underlay, tunnel: update.tunnel.clone(), - }; + multicast, + }); for ec in &ctx.ctx.event_channels { - ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); + ec.send(Event::Peer(PeerEvent::Push(Arc::clone(&push)))) + .unwrap(); } } } @@ -999,3 +1201,226 @@ fn handle_underlay_update(update: &UnderlayUpdate, ctx: &HandlerContext) { .imported_underlay_prefixes .store(ctx.ctx.db.imported_count() as u64, Ordering::Relaxed); } + +/// Handle multicast group subscription updates from a peer. +/// +/// Validation uses path-vector-based RPF rather than unicast-RIB RPF. +/// DDM operates on the underlay while multicast sources are overlay +/// addresses, so traditional (S,G) RPF against the unicast RIB does not +/// apply at this layer. The MRIB RPF module in rdb handles that check +/// before routes are originated into DDM. At the DDM exchange level, +/// the path vector provides loop detection and carries topology +/// information for replication optimization per [RFD 488]. +/// +/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488 +fn handle_multicast_update(update: &MulticastUpdate, ctx: &HandlerContext) { + let db = &ctx.ctx.db; + let hostname = &ctx.ctx.hostname; + + let mut import = HashSet::new(); + for pv in &update.announce { + // Path-vector RPF: drop if our router_id appears in the path, + // indicating the announcement has already traversed us. + if pv.path.iter().any(|hop| &hop.router_id == hostname) { + dbg!( + ctx.log, + ctx.ctx.config.if_name, + "dropping multicast announce for {:?} - loop detected \ + (path length {})", + pv.origin.overlay_group, + pv.path.len(), + ); + continue; + } + + import.insert(MulticastRoute { + origin: pv.origin.clone(), + nexthop: ctx.peer, + path: pv.path.clone(), + }); + } + + let mut remove = HashSet::new(); + for pv in &update.withdraw { + // Empty path is safe: MulticastRoute's PartialEq/Hash exclude + // the path field, so this matches by (origin, nexthop) only. + remove.insert(MulticastRoute { + origin: pv.origin.clone(), + nexthop: ctx.peer, + path: Vec::new(), + }); + } + + // Atomic import + delete + diff under a single lock. + let (to_add, to_del) = db.update_imported_mcast(&import, &remove); + + if let Err(e) = crate::sys::add_multicast_routes( + &ctx.log, + &ctx.ctx.config.if_name, + &to_add, + ) { + err!( + ctx.log, + ctx.ctx.config.if_name, + "add multicast routes: {e}: {to_add:#?}", + ) + } + + if let Err(e) = crate::sys::remove_multicast_routes( + &ctx.log, + &ctx.ctx.config.if_name, + &to_del, + ) { + err!( + ctx.log, + ctx.ctx.config.if_name, + "remove multicast routes: {e}: {to_del:#?}", + ) + } +} + +#[cfg(test)] +mod test { + use super::*; + use ddm_types::exchange::MulticastPathHop; + use mg_common::net::{MulticastOrigin, UnderlayMulticastIpv6}; + use std::net::Ipv6Addr; + + fn sample_multicast_update() -> MulticastUpdate { + let origin = MulticastOrigin { + overlay_group: "233.252.0.1".parse().unwrap(), + underlay_group: UnderlayMulticastIpv6::new(Ipv6Addr::new( + 0xff04, 0, 0, 0, 0, 0, 0, 1, + )) + .unwrap(), + vni: 77, + metric: 0, + source: None, + }; + let pv = MulticastPathVector { + origin, + path: vec![MulticastPathHop::new( + "router-1".into(), + Ipv6Addr::LOCALHOST, + )], + }; + MulticastUpdate::announce([pv].into_iter().collect()) + } + + #[test] + fn v4_update_round_trips() { + let update = Update { + underlay: None, + tunnel: None, + multicast: Some(sample_multicast_update()), + }; + let json = serde_json::to_string(&update).unwrap(); + let back: Update = serde_json::from_str(&json).unwrap(); + assert!(back.multicast.is_some()); + assert_eq!(back.multicast.unwrap().announce.len(), 1,); + } + + #[test] + fn v4_update_deserializes_as_v3_drops_multicast() { + let update = Update { + underlay: None, + tunnel: None, + multicast: Some(sample_multicast_update()), + }; + let json = serde_json::to_string(&update).unwrap(); + // A V3 peer would deserialize this as UpdateV3, silently + // dropping the unknown multicast field. + let v3: UpdateV3 = serde_json::from_str(&json).unwrap(); + assert!(v3.underlay.is_none()); + assert!(v3.tunnel.is_none()); + } + + #[test] + fn v3_update_deserializes_as_v4_multicast_none() { + let v3 = UpdateV3 { + underlay: None, + tunnel: None, + }; + let json = serde_json::to_string(&v3).unwrap(); + // A V4 peer receiving a V3 update gets multicast: None. + let update: Update = serde_json::from_str(&json).unwrap(); + assert!(update.multicast.is_none()); + } + + #[test] + fn v4_pull_response_round_trips() { + let origin = MulticastOrigin { + overlay_group: "ff0e::1".parse().unwrap(), + underlay_group: UnderlayMulticastIpv6::new(Ipv6Addr::new( + 0xff04, 0, 0, 0, 0, 0, 0, 2, + )) + .unwrap(), + vni: 77, + metric: 0, + source: None, + }; + let pv = MulticastPathVector { + origin, + path: vec![], + }; + let resp = PullResponse { + underlay: None, + tunnel: None, + multicast: Some([pv].into_iter().collect()), + }; + let json = serde_json::to_string(&resp).unwrap(); + let back: PullResponse = serde_json::from_str(&json).unwrap(); + assert!(back.multicast.is_some()); + } + + #[test] + fn v4_pull_response_deserializes_as_v3() { + let origin = MulticastOrigin { + overlay_group: "233.252.0.1".parse().unwrap(), + underlay_group: UnderlayMulticastIpv6::new(Ipv6Addr::new( + 0xff04, 0, 0, 0, 0, 0, 0, 1, + )) + .unwrap(), + vni: 77, + metric: 0, + source: None, + }; + let pv = MulticastPathVector { + origin, + path: vec![], + }; + let resp = PullResponse { + underlay: None, + tunnel: None, + multicast: Some([pv].into_iter().collect()), + }; + let json = serde_json::to_string(&resp).unwrap(); + // V3 peer drops the multicast field. + let v3: PullResponseV3 = serde_json::from_str(&json).unwrap(); + assert!(v3.underlay.is_none()); + assert!(v3.tunnel.is_none()); + } + + #[test] + fn v3_pull_response_deserializes_as_v4() { + let v3 = PullResponseV3 { + underlay: None, + tunnel: None, + }; + let json = serde_json::to_string(&v3).unwrap(); + let resp: PullResponse = serde_json::from_str(&json).unwrap(); + assert!(resp.multicast.is_none()); + } + + #[test] + fn from_conversions_strip_multicast() { + let update = Update { + underlay: None, + tunnel: None, + multicast: Some(sample_multicast_update()), + }; + let v3 = UpdateV3::from(update); + let back = Update::from(v3); + assert!(back.multicast.is_none()); + } +} diff --git a/ddm/src/sm.rs b/ddm/src/sm.rs index 24215795..44e6c916 100644 --- a/ddm/src/sm.rs +++ b/ddm/src/sm.rs @@ -4,12 +4,13 @@ use crate::db::Db; use crate::discovery::Version; -use crate::exchange::{TunnelUpdate, UnderlayUpdate, Update}; +use crate::exchange::{MulticastUpdate, TunnelUpdate, UnderlayUpdate, Update}; use crate::{dbg, discovery, err, exchange, inf, wrn}; use ddm_types::db::RouterKind; +use ddm_types::exchange::MulticastPathHop; use ddm_types::exchange::PathVector; use libnet::get_ipaddr_info; -use mg_common::net::TunnelOrigin; +use mg_common::net::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use slog::Logger; use std::collections::HashSet; @@ -41,11 +42,12 @@ pub enum AdminEvent { pub enum PrefixSet { Underlay(HashSet), Tunnel(HashSet), + Multicast(HashSet), } #[derive(Debug)] pub enum PeerEvent { - Push(Update), + Push(Arc), } #[derive(Debug)] @@ -425,7 +427,7 @@ impl Exchange { ); let interval = 250; // TODO as parameter loop { - match exchange::do_pull( + match exchange::do_pull_v4( &self.ctx, &self.ctx.config.addr, &self.ctx.rt, @@ -455,7 +457,7 @@ impl Exchange { ) { exchange_thread.abort(); self.ctx.db.remove_peer(self.ctx.config.if_index); - let (to_remove, to_remove_tnl) = + let (to_remove, to_remove_tnl, to_remove_mcast) = self.ctx.db.remove_nexthop_routes(self.peer); let mut routes: Vec = Vec::new(); for x in &to_remove { @@ -518,9 +520,33 @@ impl Exchange { )) }; - let push = Update { underlay, tunnel }; + // Build multicast withdrawal with our hop info + let multicast = if to_remove_mcast.is_empty() { + None + } else { + let hop = MulticastPathHop::new( + self.ctx.hostname.clone(), + self.ctx.config.addr, + ); + Some(MulticastUpdate::withdraw( + to_remove_mcast + .iter() + .map(|route| ddm_types::exchange::MulticastPathVector { + origin: route.origin.clone(), + path: vec![hop.clone()], + }) + .collect(), + )) + }; + + let push = Arc::new(Update { + underlay, + tunnel, + multicast, + }); for ec in &self.ctx.event_channels { - ec.send(Event::Peer(PeerEvent::Push(push.clone()))).unwrap(); + ec.send(Event::Peer(PeerEvent::Push(Arc::clone(&push)))) + .unwrap(); } } pull_stop.store(true, Ordering::Relaxed); @@ -601,6 +627,7 @@ impl State for Exchange { "announce: {}", e, ); + wrn!( self.log, self.ctx.config.if_name, @@ -728,6 +755,104 @@ impl State for Exchange { ); } } + Event::Admin(AdminEvent::Announce(PrefixSet::Multicast( + groups, + ))) => { + // Convert `MulticastOrigin` to `MulticastPathVector` with + // our hop info + let hop = MulticastPathHop::new( + self.ctx.hostname.clone(), + self.ctx.config.addr, + ); + let pvs: HashSet<_> = groups + .iter() + .map(|origin| { + ddm_types::exchange::MulticastPathVector { + origin: origin.clone(), + path: vec![hop.clone()], + } + }) + .collect(); + + if let Err(e) = crate::exchange::announce_multicast( + &self.ctx, + self.ctx.config.clone(), + pvs, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "announce multicast: {}", + e, + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast announce", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } + Event::Admin(AdminEvent::Withdraw(PrefixSet::Multicast( + groups, + ))) => { + // Convert MulticastOrigin to MulticastPathVector for withdrawal + let hop = MulticastPathHop::new( + self.ctx.hostname.clone(), + self.ctx.config.addr, + ); + let pvs: HashSet<_> = groups + .iter() + .map(|origin| { + ddm_types::exchange::MulticastPathVector { + origin: origin.clone(), + path: vec![hop.clone()], + } + }) + .collect(); + + if let Err(e) = crate::exchange::withdraw_multicast( + &self.ctx, + self.ctx.config.clone(), + pvs, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) { + err!( + self.log, + self.ctx.config.if_name, + "withdraw multicast: {e}", + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast withdraw", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } Event::Admin(AdminEvent::Expire(peer)) => { if self.peer == peer { inf!( @@ -770,6 +895,8 @@ impl State for Exchange { self.peer, update, ); + let update = Arc::try_unwrap(update) + .unwrap_or_else(|arc| (*arc).clone()); if let Some(push) = update.underlay { if !push.announce.is_empty() && let Err(e) = crate::exchange::announce_underlay( @@ -817,8 +944,7 @@ impl State for Exchange { err!( self.log, self.ctx.config.if_name, - "withdraw: {}", - e, + "withdraw: {e}", ); wrn!( self.log, @@ -836,6 +962,71 @@ impl State for Exchange { ); } } + // Handle multicast redistribution + if let Some(push) = update.multicast { + if !push.announce.is_empty() + && let Err(e) = crate::exchange::announce_multicast( + &self.ctx, + self.ctx.config.clone(), + push.announce, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) + { + err!( + self.log, + self.ctx.config.if_name, + "announce multicast: {e}", + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast announce", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + if !push.withdraw.is_empty() + && let Err(e) = crate::exchange::withdraw_multicast( + &self.ctx, + self.ctx.config.clone(), + push.withdraw, + self.peer, + self.version, + self.ctx.rt.clone(), + self.log.clone(), + ) + { + err!( + self.log, + self.ctx.config.if_name, + "withdraw multicast: {e}", + ); + wrn!( + self.log, + self.ctx.config.if_name, + "expiring peer {} due to failed multicast withdraw", + self.peer, + ); + self.expire_peer(&exchange_thread, &pull_stop); + return ( + Box::new(Solicit::new( + self.ctx.clone(), + self.log.clone(), + )), + event, + ); + } + } } Event::Neighbor(NeighborEvent::Expire) => { wrn!( diff --git a/ddm/src/sys.rs b/ddm/src/sys.rs index 915fc0b5..22b782f0 100644 --- a/ddm/src/sys.rs +++ b/ddm/src/sys.rs @@ -4,7 +4,7 @@ use crate::sm::{Config, DpdConfig}; use crate::{dbg, err, inf, wrn}; -use ddm_types::db::TunnelRoute; +use ddm_types::db::{MulticastRoute, TunnelRoute}; use dpd_client::Client; use dpd_client::ClientState; use dpd_client::types; @@ -359,6 +359,99 @@ pub fn remove_tunnel_routes( Ok(()) } +#[cfg(not(target_os = "illumos"))] +pub fn add_multicast_routes( + _log: &Logger, + _ifname: &str, + _routes: &HashSet, +) -> Result<(), String> { + todo!(); +} + +/// Update OPTE multicast-to-physical (M2P) table entries for learned +/// multicast routes. Each route's overlay group is mapped to the +/// corresponding underlay multicast address so that OPTE can direct +/// multicast traffic to the correct underlay destinations. +#[cfg(target_os = "illumos")] +pub fn add_multicast_routes( + log: &Logger, + ifname: &str, + routes: &HashSet, +) -> Result<(), String> { + use oxide_vpc::api::MulticastUnderlay; + use oxide_vpc::api::SetMcast2PhysReq; + + let hdl = OpteHdl::open().map_err(|e| e.to_string())?; + + for route in routes { + let underlay = + MulticastUnderlay::new(route.origin.underlay_group.ip().into()) + .map_err(|e| { + format!( + "invalid underlay multicast address {}: {e}", + route.origin.underlay_group, + ) + })?; + let req = SetMcast2PhysReq { + group: route.origin.overlay_group.into(), + underlay, + }; + let overlay = route.origin.overlay_group; + let underlay_addr = route.origin.underlay_group; + inf!(log, ifname, "adding M2P: {overlay:?} -> {underlay_addr}"); + if let Err(e) = hdl.set_m2p(&req) { + err!(log, ifname, "failed to set M2P route: {req:?}: {e}"); + } + } + + Ok(()) +} + +#[cfg(not(target_os = "illumos"))] +pub fn remove_multicast_routes( + _log: &Logger, + _ifname: &str, + _routes: &HashSet, +) -> Result<(), String> { + todo!() +} + +/// Remove OPTE M2P table entries for withdrawn multicast routes. +#[cfg(target_os = "illumos")] +pub fn remove_multicast_routes( + log: &Logger, + ifname: &str, + routes: &HashSet, +) -> Result<(), String> { + use oxide_vpc::api::ClearMcast2PhysReq; + use oxide_vpc::api::MulticastUnderlay; + + let hdl = OpteHdl::open().map_err(|e| e.to_string())?; + + for route in routes { + let underlay = + MulticastUnderlay::new(route.origin.underlay_group.ip().into()) + .map_err(|e| { + format!( + "invalid underlay multicast address {}: {e}", + route.origin.underlay_group, + ) + })?; + let req = ClearMcast2PhysReq { + group: route.origin.overlay_group.into(), + underlay, + }; + let overlay = route.origin.overlay_group; + let underlay_addr = route.origin.underlay_group; + inf!(log, ifname, "removing M2P: {overlay:?} -> {underlay_addr}"); + if let Err(e) = hdl.clear_m2p(&req) { + err!(log, ifname, "failed to clear M2P route: {req:?}: {e}"); + } + } + + Ok(()) +} + pub fn remove_underlay_routes( log: &Logger, ifname: &str, diff --git a/ddmadm/src/main.rs b/ddmadm/src/main.rs index 800315d8..309b8a6f 100644 --- a/ddmadm/src/main.rs +++ b/ddmadm/src/main.rs @@ -60,6 +60,18 @@ enum SubCommand { /// Withdraw prefixes from a DDM router. TunnelWithdraw(TunnelEndpoint), + /// Get multicast groups imported from DDM peers. + MulticastImported, + + /// Get locally originated multicast groups. + MulticastOriginated, + + /// Advertise multicast groups from this router. + MulticastAdvertise(MulticastGroup), + + /// Withdraw multicast groups from this router. + MulticastWithdraw(MulticastGroup), + /// Sync prefix information from peers. Sync, } @@ -84,6 +96,29 @@ struct TunnelEndpoint { pub metric: u64, } +#[derive(Debug, Parser)] +struct MulticastGroup { + /// Overlay multicast group address (e.g. 233.252.0.1 or ff0e::1). + #[arg(short = 'g', long)] + pub overlay_group: IpAddr, + + /// Underlay multicast address (ff04::/64 admin-local scope). + #[arg(short = 'u', long)] + pub underlay_group: Ipv6Addr, + + /// Virtual Network Identifier. + #[arg(short, long)] + pub vni: u32, + + /// Path metric. + #[arg(short, long, default_value_t = 0)] + pub metric: u64, + + /// Source address for (S,G) routes (omit for (*,G)). + #[arg(short, long)] + pub source: Option, +} + #[derive(Debug, Parser)] struct Peer { addr: Ipv6Addr, @@ -242,6 +277,94 @@ async fn run() -> Result<()> { }]) .await?; } + SubCommand::MulticastImported => { + let msg = client.get_multicast_groups().await?; + let mut tw = TabWriter::new(stdout()); + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}\t{}", + "Overlay Group".dimmed(), + "Underlay Group".dimmed(), + "VNI".dimmed(), + "Metric".dimmed(), + "Source".dimmed(), + "Path".dimmed(), + )?; + for route in msg.into_inner() { + let source = match &route.origin.source { + Some(s) => s.to_string(), + None => "(*,G)".to_string(), + }; + let path: Vec<_> = route + .path + .iter() + .rev() + .map(|h| h.router_id.clone()) + .collect(); + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}\t{}", + route.origin.overlay_group, + route.origin.underlay_group, + route.origin.vni, + route.origin.metric, + source, + path.join(" "), + )?; + } + tw.flush()?; + } + SubCommand::MulticastOriginated => { + let msg = client.get_originated_multicast_groups().await?; + let mut tw = TabWriter::new(stdout()); + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}", + "Overlay Group".dimmed(), + "Underlay Group".dimmed(), + "VNI".dimmed(), + "Metric".dimmed(), + "Source".dimmed(), + )?; + for origin in msg.into_inner() { + let source = match &origin.source { + Some(s) => s.to_string(), + None => "(*,G)".to_string(), + }; + writeln!( + &mut tw, + "{}\t{}\t{}\t{}\t{}", + origin.overlay_group, + origin.underlay_group, + origin.vni, + origin.metric, + source, + )?; + } + tw.flush()?; + } + SubCommand::MulticastAdvertise(mg) => { + client + .advertise_multicast_groups(&vec![types::MulticastOrigin { + overlay_group: mg.overlay_group, + underlay_group: mg.underlay_group, + vni: mg.vni, + metric: mg.metric, + source: mg.source, + }]) + .await?; + } + SubCommand::MulticastWithdraw(mg) => { + client + .withdraw_multicast_groups(&vec![types::MulticastOrigin { + overlay_group: mg.overlay_group, + underlay_group: mg.underlay_group, + vni: mg.vni, + metric: mg.metric, + source: mg.source, + }]) + .await?; + } SubCommand::Sync => { client.sync().await?; } diff --git a/mg-common/Cargo.toml b/mg-common/Cargo.toml index 87a30308..0c40e317 100644 --- a/mg-common/Cargo.toml +++ b/mg-common/Cargo.toml @@ -19,12 +19,16 @@ backoff.workspace = true smf.workspace = true uuid.workspace = true libc.workspace = true +omicron-common.workspace = true # We need this on illumos, but must omit it on other platforms [target.'cfg(target_os = "illumos")'.dependencies.libnet] workspace = true optional = true +[dev-dependencies] +serde_json.workspace = true + [features] default = ["libnet"] libnet = ["dep:libnet"] diff --git a/mg-common/src/net.rs b/mg-common/src/net.rs index f1784afe..bce080b2 100644 --- a/mg-common/src/net.rs +++ b/mg-common/src/net.rs @@ -2,10 +2,102 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use omicron_common::address::UNDERLAY_MULTICAST_SUBNET; +use omicron_common::api::external::Vni; use oxnet::{IpNet, Ipv4Net, Ipv6Net}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::fmt; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::str::FromStr; + +/// Default VNI for multicast routing. +pub const DEFAULT_MULTICAST_VNI: u32 = Vni::DEFAULT_MULTICAST_VNI.as_u32(); + +fn default_multicast_vni() -> u32 { + DEFAULT_MULTICAST_VNI +} + +/// A validated underlay multicast IPv6 address within ff04::/64. +/// +/// The Oxide rack maps overlay multicast groups 1:1 to admin-local scoped +/// IPv6 multicast addresses in `UNDERLAY_MULTICAST_SUBNET` (ff04::/64). +/// This type enforces that invariant at construction time. +#[derive( + Debug, + Copy, + Clone, + Eq, + PartialEq, + PartialOrd, + Ord, + Hash, + Serialize, + Deserialize, + JsonSchema, +)] +#[serde(try_from = "Ipv6Addr", into = "Ipv6Addr")] +#[schemars(transparent)] +pub struct UnderlayMulticastIpv6(Ipv6Addr); + +impl UnderlayMulticastIpv6 { + /// Create a new validated underlay multicast address. + /// + /// # Errors + /// + /// Returns an error if the address is not within ff04::/64. + pub fn new(value: Ipv6Addr) -> Result { + if !UNDERLAY_MULTICAST_SUBNET.contains(value) { + return Err(format!( + "underlay address {value} is not within \ + {UNDERLAY_MULTICAST_SUBNET}" + )); + } + Ok(Self(value)) + } + + /// Returns the underlying IPv6 address. + #[inline] + pub const fn ip(&self) -> Ipv6Addr { + self.0 + } +} + +impl fmt::Display for UnderlayMulticastIpv6 { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl TryFrom for UnderlayMulticastIpv6 { + type Error = String; + + fn try_from(value: Ipv6Addr) -> Result { + Self::new(value) + } +} + +impl From for Ipv6Addr { + fn from(addr: UnderlayMulticastIpv6) -> Self { + addr.0 + } +} + +impl From for IpAddr { + fn from(addr: UnderlayMulticastIpv6) -> Self { + IpAddr::V6(addr.0) + } +} + +impl FromStr for UnderlayMulticastIpv6 { + type Err = String; + + fn from_str(s: &str) -> Result { + let addr: Ipv6Addr = + s.parse().map_err(|e| format!("invalid IPv6: {e}"))?; + Self::new(addr) + } +} #[derive( Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, @@ -93,3 +185,131 @@ pub enum IpPrefix { V4(Ipv4Prefix), V6(Ipv6Prefix), } + +/// Origin information for a multicast group announcement. +/// +/// This is analogous to TunnelOrigin but for multicast groups. +/// +/// This represents a subscription to a multicast group that should be +/// advertised via DDM. The overlay_group is the application-visible multicast +/// address (e.g., 233.252.0.1 or ff0e::1), while underlay_group is the mapped +/// admin-local scoped IPv6 address (ff04::X) used in the underlay network. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct MulticastOrigin { + /// The overlay multicast group address (IPv4 or IPv6). + /// This is the group address visible to applications. + pub overlay_group: IpAddr, + + /// The underlay multicast group address (ff04::X). + /// Validated at construction to be within ff04::/64. + pub underlay_group: UnderlayMulticastIpv6, + + /// VNI for this multicast group (identifies the VPC/network context). + #[serde(default = "default_multicast_vni")] + pub vni: u32, + + /// Metric for path selection (lower is better). + /// + /// Used for multi-rack replication optimization. + /// Excluded from identity (Hash/Eq) so that metric changes update + /// an existing entry rather than creating a duplicate. + #[serde(default)] + pub metric: u64, + + /// Optional source address for Source-Specific Multicast (S,G) routes. + /// None for Any-Source Multicast (*,G) routes. + #[serde(default)] + pub source: Option, +} + +impl PartialEq for MulticastOrigin { + fn eq(&self, other: &Self) -> bool { + self.overlay_group == other.overlay_group + && self.underlay_group == other.underlay_group + && self.vni == other.vni + && self.source == other.source + } +} + +impl Eq for MulticastOrigin {} + +impl std::hash::Hash for MulticastOrigin { + fn hash(&self, state: &mut H) { + self.overlay_group.hash(state); + self.underlay_group.hash(state); + self.vni.hash(state); + self.source.hash(state); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn underlay_valid_ff04() { + let addr = Ipv6Addr::new(0xff04, 0, 0, 0, 0, 0, 0, 1); + assert!(UnderlayMulticastIpv6::new(addr).is_ok()); + } + + #[test] + fn underlay_rejects_non_admin_local() { + // ff0e:: is global scope, not admin-local + let addr = Ipv6Addr::new(0xff0e, 0, 0, 0, 0, 0, 0, 1); + assert!(UnderlayMulticastIpv6::new(addr).is_err()); + } + + #[test] + fn underlay_rejects_unicast() { + let addr = Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1); + assert!(UnderlayMulticastIpv6::new(addr).is_err()); + } + + #[test] + fn underlay_serde_round_trip() { + let addr = UnderlayMulticastIpv6::new(Ipv6Addr::new( + 0xff04, 0, 0, 0, 0, 0, 0, 42, + )) + .unwrap(); + let json = serde_json::to_string(&addr).unwrap(); + let back: UnderlayMulticastIpv6 = serde_json::from_str(&json).unwrap(); + assert_eq!(addr, back); + } + + #[test] + fn underlay_serde_rejects_invalid() { + // ff0e::1 serialized as an Ipv6Addr, then deserialized as + // UnderlayMulticastIpv6 should fail via try_from. + let json = + serde_json::to_string(&Ipv6Addr::new(0xff0e, 0, 0, 0, 0, 0, 0, 1)) + .unwrap(); + let result: Result = + serde_json::from_str(&json); + assert!(result.is_err()); + } + + #[test] + fn multicast_origin_rejects_bad_underlay() { + let json = serde_json::json!({ + "overlay_group": "233.252.0.1", + "underlay_group": "ff0e::1", + "vni": 77 + }); + let result: Result = serde_json::from_value(json); + assert!(result.is_err()); + } + + #[test] + fn multicast_origin_accepts_valid() { + let json = serde_json::json!({ + "overlay_group": "233.252.0.1", + "underlay_group": "ff04::1", + "vni": 77 + }); + let origin: MulticastOrigin = serde_json::from_value(json).unwrap(); + assert_eq!( + origin.underlay_group.ip(), + Ipv6Addr::new(0xff04, 0, 0, 0, 0, 0, 0, 1), + ); + } +} diff --git a/mg-lower/src/ddm.rs b/mg-lower/src/ddm.rs index ac7d9708..45a53023 100644 --- a/mg-lower/src/ddm.rs +++ b/mg-lower/src/ddm.rs @@ -5,7 +5,7 @@ use crate::log::ddm_log; #[cfg(target_os = "illumos")] use ddm_admin_client::Client; -use ddm_admin_client::types::TunnelOrigin; +use ddm_admin_client::types::{MulticastOrigin, TunnelOrigin}; use oxnet::Ipv6Net; use slog::Logger; use std::{net::Ipv6Addr, sync::Arc}; @@ -111,3 +111,57 @@ pub(crate) fn remove_tunnel_routes<'a, I: Iterator>( pub fn new_ddm_client(log: &Logger) -> Client { Client::new("http://localhost:8000", log.clone()) } + +pub(crate) fn add_multicast_routes< + 'a, + I: Iterator, +>( + client: &impl Ddm, + routes: I, + rt: &Arc, + log: &Logger, +) { + let routes: Vec = routes.cloned().collect(); + if routes.is_empty() { + return; + } + let resp = + rt.block_on(async { client.advertise_multicast_groups(&routes).await }); + if let Err(e) = resp { + ddm_log!(log, + error, + "advertise multicast groups error: {e}"; + "error" => format!("{e}"), + "groups" => format!("{routes:#?}") + ); + } +} + +pub(crate) fn remove_multicast_routes< + 'a, + I: Iterator, +>( + client: &impl Ddm, + routes: I, + rt: &Arc, + log: &Logger, +) { + let routes: Vec = routes.cloned().collect(); + if routes.is_empty() { + return; + } + let resp = + rt.block_on(async { client.withdraw_multicast_groups(&routes).await }); + match resp { + Err(e) => ddm_log!(log, + error, + "withdraw multicast groups error: {e}"; + "groups" => format!("{routes:#?}") + ), + Ok(_) => ddm_log!(log, + info, + "withdrew multicast groups"; + "groups" => format!("{routes:#?}") + ), + } +} diff --git a/mg-lower/src/lib.rs b/mg-lower/src/lib.rs index bcbf5b57..c1b84595 100644 --- a/mg-lower/src/lib.rs +++ b/mg-lower/src/lib.rs @@ -39,6 +39,7 @@ mod ddm; mod dendrite; mod error; mod log; +pub mod mrib; mod platform; #[cfg(test)] diff --git a/mg-lower/src/mrib.rs b/mg-lower/src/mrib.rs new file mode 100644 index 00000000..ea8069e9 --- /dev/null +++ b/mg-lower/src/mrib.rs @@ -0,0 +1,234 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! MRIB (Multicast Routing Information Base) synchronization to DDM. +//! +//! This module watches for MRIB changes and propagates multicast group +//! subscriptions to DDM for distribution across the underlay network. +//! +//! ## Data Flow +//! +//! ```text +//! MRIB (loc_mrib changes) +//! | +//! v [MribChangeNotification] +//! mg-lower/mrib.rs +//! | +//! v [MulticastOrigin] +//! DDM admin API +//! | +//! v [DDM exchange protocol] +//! Other sleds/racks +//! ``` + +use crate::ddm::{ + add_multicast_routes, new_ddm_client, remove_multicast_routes, +}; +use crate::platform::{Ddm, ProductionDdm}; +use ddm_admin_client::types::MulticastOrigin; +use mg_common::net::DEFAULT_MULTICAST_VNI; +use rdb::Mrib; +use rdb::types::{ + DEFAULT_MULTICAST_VNI as DEFAULT_MCAST_VNI, MribChangeNotification, + MulticastAddr, MulticastRoute, +}; +use slog::{Logger, debug, error, info}; +use std::collections::HashSet; +use std::net::IpAddr; +use std::sync::Arc; +use std::sync::mpsc::{RecvTimeoutError, channel}; +use std::thread::sleep; +use std::time::Duration; + +const MG_LOWER_MRIB_TAG: &str = "mg-lower-mrib"; + +/// Convert an MRIB MulticastRoute to a DDM MulticastOrigin. +/// +/// The MulticastOrigin captures the essential information needed for DDM +/// to advertise the multicast group subscription to other routers: +/// - overlay_group: The multicast group address (e.g., 233.252.0.1 or ff0e::1) +/// - underlay_group: The mapped underlay address within the ff04::/64 subnet +/// (a /64 within admin-local scope per RFC 7346, reserved for the rack's +/// underlay multicast traffic) +/// - source: Optional source for (S,G) routes, None for (*,G) +/// - vni: Virtual Network Identifier (default multicast VNI) +fn mrib_route_to_ddm_origin(route: &MulticastRoute) -> MulticastOrigin { + // Extract overlay group address from the route key + let overlay_group: IpAddr = match route.key.group() { + MulticastAddr::V4(v4) => IpAddr::V4(v4.ip()), + MulticastAddr::V6(v6) => IpAddr::V6(v6.ip()), + }; + + // Extract source address for (S,G) routes + let source = route.key.source(); + + MulticastOrigin { + overlay_group, + underlay_group: route.underlay_group.into(), + vni: DEFAULT_MULTICAST_VNI, + metric: 0, // Default metric + source, + } +} + +/// Run the MRIB synchronization loop. +/// +/// This function loops forever, watching for MRIB changes and synchronizing +/// them to DDM. It runs on the calling thread. +pub fn run(mrib: Mrib, log: Logger, rt: Arc) { + loop { + let (tx, rx) = channel(); + + // Register as MRIB watcher + mrib.watch(MG_LOWER_MRIB_TAG.into(), tx); + + let ddm = ProductionDdm { + client: new_ddm_client(&log), + }; + + // Initial full sync + if let Err(e) = full_sync(&mrib, &ddm, &log, &rt) { + error!(log, "MRIB full sync failed: {e}"); + info!(log, "restarting MRIB sync loop in one second"); + sleep(Duration::from_secs(1)); + continue; + } + + // Handle incremental changes + loop { + match rx.recv_timeout(Duration::from_secs(10)) { + Ok(notification) => { + if let Err(e) = + handle_change(&mrib, notification, &ddm, &log, &rt) + { + error!(log, "MRIB change handling failed: {e}"); + } + } + Err(RecvTimeoutError::Timeout) => { + // Periodic full sync to catch any missed changes + if let Err(e) = full_sync(&mrib, &ddm, &log, &rt) { + error!(log, "MRIB periodic sync failed: {e}"); + } + } + Err(RecvTimeoutError::Disconnected) => { + error!(log, "MRIB watcher disconnected"); + break; + } + } + } + } +} + +/// Perform a full synchronization of MRIB to DDM. +/// +/// This compares the current MRIB loc_mrib with what DDM has advertised +/// and reconciles any differences. +pub(crate) fn full_sync( + mrib: &Mrib, + ddm: &D, + log: &Logger, + rt: &Arc, +) -> Result<(), String> { + // Get current MRIB state (installed/selected routes) + let mrib_routes = mrib.loc_mrib(); + + // Convert to DDM MulticastOrigin set + let mrib_origins: HashSet = + mrib_routes.values().map(mrib_route_to_ddm_origin).collect(); + + // Get current DDM advertised state + let ddm_current: HashSet = rt + .block_on(async { ddm.get_originated_multicast_groups().await }) + .map_err(|e| format!("failed to get DDM multicast groups: {e}"))? + .into_inner() + .into_iter() + .collect(); + + // Compute diff + let to_add: Vec<_> = mrib_origins.difference(&ddm_current).collect(); + let to_remove: Vec<_> = ddm_current.difference(&mrib_origins).collect(); + + if !to_add.is_empty() { + info!( + log, + "MRIB sync: adding {} multicast groups to DDM", + to_add.len() + ); + add_multicast_routes(ddm, to_add.into_iter(), rt, log); + } + + if !to_remove.is_empty() { + info!( + log, + "MRIB sync: removing {} multicast groups from DDM", + to_remove.len() + ); + remove_multicast_routes(ddm, to_remove.into_iter(), rt, log); + } + + Ok(()) +} + +/// Handle an incremental MRIB change notification. +fn handle_change( + mrib: &Mrib, + notification: MribChangeNotification, + ddm: &D, + log: &Logger, + rt: &Arc, +) -> Result<(), String> { + // Get current DDM state for comparison + let ddm_current: HashSet = rt + .block_on(async { ddm.get_originated_multicast_groups().await }) + .map_err(|e| format!("failed to get DDM multicast groups: {e}"))? + .into_inner() + .into_iter() + .collect(); + + let mut to_add = Vec::new(); + let mut to_remove = Vec::new(); + + for key in notification.changed { + // Check if route exists in loc_mrib (installed) + if let Some(route) = mrib.get_selected_route(&key) { + let origin = mrib_route_to_ddm_origin(&route); + if !ddm_current.contains(&origin) { + to_add.push(origin); + } + } else { + // Route was removed from loc_mrib, so we need to find matching DDM + // origin. We check all DDM origins to find any that match this key + for ddm_origin in &ddm_current { + // Reconstruct the key from the DDM origin to compare + if let Ok(overlay_group) = + MulticastAddr::try_from(ddm_origin.overlay_group) + && let Ok(ddm_key) = rdb::types::MulticastRouteKey::new( + ddm_origin.source, + overlay_group, + DEFAULT_MCAST_VNI, + ) + && ddm_key == key + { + to_remove.push(ddm_origin.clone()); + } + } + } + } + + if !to_add.is_empty() { + debug!(log, "MRIB change: adding {} multicast groups", to_add.len()); + add_multicast_routes(ddm, to_add.iter(), rt, log); + } + + if !to_remove.is_empty() { + debug!( + log, + "MRIB change: removing {} multicast groups", + to_remove.len() + ); + remove_multicast_routes(ddm, to_remove.iter(), rt, log); + } + + Ok(()) +} diff --git a/mg-lower/src/platform.rs b/mg-lower/src/platform.rs index a05143b9..363dceaf 100644 --- a/mg-lower/src/platform.rs +++ b/mg-lower/src/platform.rs @@ -216,6 +216,31 @@ pub trait Ddm { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, >; + + async fn get_originated_multicast_groups( + &self, + ) -> Result< + ddm_admin_client::ResponseValue>, + ddm_admin_client::Error, + >; + + #[allow(clippy::ptr_arg)] + async fn advertise_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + >; + + #[allow(clippy::ptr_arg)] + async fn withdraw_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + >; } /// This trait wraps the methods that have expectations about switch zone @@ -405,6 +430,35 @@ impl Ddm for ProductionDdm { > { self.client.withdraw_tunnel_endpoints(body).await } + + async fn get_originated_multicast_groups( + &self, + ) -> Result< + ddm_admin_client::ResponseValue>, + ddm_admin_client::Error, + > { + self.client.get_originated_multicast_groups().await + } + + async fn advertise_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + self.client.advertise_multicast_groups(body).await + } + + async fn withdraw_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + self.client.withdraw_multicast_groups(body).await + } } /// Production switch zone that uses libnet for route lookups (illumos only). @@ -699,6 +753,7 @@ pub(crate) mod test { pub(crate) struct TestDdm { pub(crate) tunnel_originated: Mutex>, pub(crate) originated: Mutex>, + pub(crate) multicast_originated: Mutex>, } impl Default for TestDdm { @@ -706,6 +761,7 @@ pub(crate) mod test { Self { tunnel_originated: Mutex::new(Vec::default()), originated: Mutex::new(Vec::default()), + multicast_originated: Mutex::new(Vec::default()), } } } @@ -766,6 +822,45 @@ pub(crate) mod test { .retain(|x| !body.contains(x)); Ok(ddm_response_ok!(())) } + + async fn get_originated_multicast_groups( + &self, + ) -> Result< + ddm_admin_client::ResponseValue>, + ddm_admin_client::Error, + > { + Ok(ddm_response_ok!( + self.multicast_originated.lock().unwrap().clone() + )) + } + + async fn advertise_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + self.multicast_originated + .lock() + .unwrap() + .extend(body.clone()); + Ok(ddm_response_ok!(())) + } + + async fn withdraw_multicast_groups<'a>( + &'a self, + body: &'a Vec, + ) -> Result< + ddm_admin_client::ResponseValue<()>, + ddm_admin_client::Error, + > { + self.multicast_originated + .lock() + .unwrap() + .retain(|x| !body.contains(x)); + Ok(ddm_response_ok!(())) + } } /// A mock switch zone implementation. diff --git a/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub b/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub new file mode 100644 index 00000000..0d935c8b --- /dev/null +++ b/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json.gitstub @@ -0,0 +1 @@ +76204d2907209bd8b963fb2da976ea688282d990:openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json diff --git a/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json b/openapi/ddm-admin/ddm-admin-2.0.0-3dc476.json similarity index 64% rename from openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json rename to openapi/ddm-admin/ddm-admin-2.0.0-3dc476.json index fe80efd3..954fd3ad 100644 --- a/openapi/ddm-admin/ddm-admin-1.0.0-b6eac7.json +++ b/openapi/ddm-admin/ddm-admin-2.0.0-3dc476.json @@ -6,7 +6,7 @@ "url": "https://oxide.computer", "email": "api@oxide.computer" }, - "version": "1.0.0" + "version": "2.0.0" }, "paths": { "/disable-stats": { @@ -51,6 +51,94 @@ } } }, + "/multicast_group": { + "put": { + "operationId": "advertise_multicast_groups", + "requestBody": { + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastOrigin" + }, + "uniqueItems": true + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + }, + "delete": { + "operationId": "withdraw_multicast_groups", + "requestBody": { + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastOrigin" + }, + "uniqueItems": true + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, + "/multicast_groups": { + "get": { + "operationId": "get_multicast_groups", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastRoute", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastRoute" + }, + "uniqueItems": true + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/originated": { "get": { "operationId": "get_originated", @@ -79,6 +167,34 @@ } } }, + "/originated_multicast_groups": { + "get": { + "operationId": "get_originated_multicast_groups", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "Set_of_MulticastOrigin", + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastOrigin" + }, + "uniqueItems": true + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/originated_tunnel_endpoints": { "get": { "operationId": "get_originated_tunnel_endpoints", @@ -444,6 +560,104 @@ "type": "string", "pattern": "^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\\/([0-9]|[1-9][0-9]|1[0-1][0-9]|12[0-8])$" }, + "MulticastOrigin": { + "description": "Origin information for a multicast group announcement.\n\nThis is analogous to TunnelOrigin but for multicast groups.\n\nThis represents a subscription to a multicast group that should be advertised via DDM. The overlay_group is the application-visible multicast address (e.g., 233.252.0.1 or ff0e::1), while underlay_group is the mapped admin-local scoped IPv6 address (ff04::X) used in the underlay network.", + "type": "object", + "properties": { + "metric": { + "description": "Metric for path selection (lower is better).\n\nUsed for multi-rack replication optimization. Excluded from identity (Hash/Eq) so that metric changes update an existing entry rather than creating a duplicate.", + "default": 0, + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "overlay_group": { + "description": "The overlay multicast group address (IPv4 or IPv6). This is the group address visible to applications.", + "type": "string", + "format": "ip" + }, + "source": { + "nullable": true, + "description": "Optional source address for Source-Specific Multicast (S,G) routes. None for Any-Source Multicast (*,G) routes.", + "default": null, + "type": "string", + "format": "ip" + }, + "underlay_group": { + "description": "The underlay multicast group address (ff04::X). Validated at construction to be within ff04::/64.", + "type": "string", + "format": "ipv6" + }, + "vni": { + "description": "VNI for this multicast group (identifies the VPC/network context).", + "default": 77, + "type": "integer", + "format": "uint32", + "minimum": 0 + } + }, + "required": [ + "overlay_group", + "underlay_group" + ] + }, + "MulticastPathHop": { + "description": "A single hop in the multicast path, carrying metadata needed for replication optimization.\n\nUnlike unicast paths which only need hostnames, multicast hops carry additional information for computing optimal replication points per [RFD 488].\n\n[RFD 488]: https://rfd.shared.oxide.computer/rfd/0488", + "type": "object", + "properties": { + "downstream_subscriber_count": { + "description": "Number of downstream subscribers reachable via this hop. Used for load-aware replication decisions in multi-rack topologies.", + "default": 0, + "type": "integer", + "format": "uint32", + "minimum": 0 + }, + "router_id": { + "description": "Router identifier (hostname).", + "type": "string" + }, + "underlay_addr": { + "description": "The underlay address of this router (for replication targeting).", + "type": "string", + "format": "ipv6" + } + }, + "required": [ + "router_id", + "underlay_addr" + ] + }, + "MulticastRoute": { + "description": "A multicast route learned via DDM.\n\nCarries both the group origin and the path vector from the originating subscriber through intermediate transit routers. The path enables loop detection and (in multi-rack topologies) replication optimizations per [RFD 488] in the future.\n\nEquality and hashing consider only `origin` and `nexthop` so that a route update with a longer path replaces the existing entry in hash-based collections.\n\n[RFD 488]: https://rfd.shared.oxide.computer/rfd/0488", + "type": "object", + "properties": { + "nexthop": { + "description": "Underlay nexthop address (DDM peer that advertised this route). Used to associate the route with a peer for expiration.", + "type": "string", + "format": "ipv6" + }, + "origin": { + "description": "The multicast group origin information.", + "allOf": [ + { + "$ref": "#/components/schemas/MulticastOrigin" + } + ] + }, + "path": { + "description": "Path vector from the originating subscriber outward. Each hop records the router that redistributed this subscription announcement. Used for loop detection on pull and for future replication optimization in multi-rack topologies.", + "default": [], + "type": "array", + "items": { + "$ref": "#/components/schemas/MulticastPathHop" + } + } + }, + "required": [ + "nexthop", + "origin" + ] + }, "PathVector": { "type": "object", "properties": { diff --git a/openapi/ddm-admin/ddm-admin-latest.json b/openapi/ddm-admin/ddm-admin-latest.json index 45446659..39659731 120000 --- a/openapi/ddm-admin/ddm-admin-latest.json +++ b/openapi/ddm-admin/ddm-admin-latest.json @@ -1 +1 @@ -ddm-admin-1.0.0-b6eac7.json \ No newline at end of file +ddm-admin-2.0.0-3dc476.json \ No newline at end of file From 4133f8c460eaa6ad1338507a5f04de27ff6e2098 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Tue, 7 Apr 2026 02:48:34 +0000 Subject: [PATCH 2/4] [review] address PR review: doc comments, lock! cleanup --- .../versions/src/multicast_support/db.rs | 19 +++-- .../src/multicast_support/exchange.rs | 20 +++--- ddm/src/exchange.rs | 11 +-- mg-lower/src/platform.rs | 69 +++++++++---------- ...dc476.json => ddm-admin-2.0.0-bdf299.json} | 4 +- openapi/ddm-admin/ddm-admin-latest.json | 2 +- ...fc8d9c.json => mg-admin-8.0.0-082a16.json} | 2 +- openapi/mg-admin/mg-admin-latest.json | 2 +- rdb-types/src/lib.rs | 4 +- 9 files changed, 64 insertions(+), 69 deletions(-) rename openapi/ddm-admin/{ddm-admin-2.0.0-3dc476.json => ddm-admin-2.0.0-bdf299.json} (96%) rename openapi/mg-admin/{mg-admin-8.0.0-fc8d9c.json => mg-admin-8.0.0-082a16.json} (99%) diff --git a/ddm-types/versions/src/multicast_support/db.rs b/ddm-types/versions/src/multicast_support/db.rs index f6cdf4d0..8fc2e455 100644 --- a/ddm-types/versions/src/multicast_support/db.rs +++ b/ddm-types/versions/src/multicast_support/db.rs @@ -12,16 +12,15 @@ use crate::v2::exchange::MulticastPathHop; /// A multicast route learned via DDM. /// -/// Carries both the group origin and the path vector from the -/// originating subscriber through intermediate transit routers. -/// The path enables loop detection and (in multi-rack topologies) -/// replication optimizations per [RFD 488] in the future. -/// -/// Equality and hashing consider only `origin` and `nexthop` so that -/// a route update with a longer path replaces the existing entry in -/// hash-based collections. -/// -/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488 +/// Carries a MulticastOrigin (overlay group + ff04::/64 underlay +/// mapping) and the path vector from the originating subscriber +/// through intermediate transit routers. +// The path enables loop detection and (in multi-rack topologies) +// replication optimizations (RFD 488) in the future. +// +// Equality and hashing consider only `origin` and `nexthop` so that +// a route update with a longer path replaces the existing entry in +// hash-based collections. #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] pub struct MulticastRoute { /// The multicast group origin information. diff --git a/ddm-types/versions/src/multicast_support/exchange.rs b/ddm-types/versions/src/multicast_support/exchange.rs index 4aa528ed..ca0cb161 100644 --- a/ddm-types/versions/src/multicast_support/exchange.rs +++ b/ddm-types/versions/src/multicast_support/exchange.rs @@ -8,12 +8,9 @@ use std::net::Ipv6Addr; /// A single hop in the multicast path, carrying metadata needed for /// replication optimization. -/// -/// Unlike unicast paths which only need hostnames, multicast hops carry -/// additional information for computing optimal replication points per -/// [RFD 488]. -/// -/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488 +// Unlike unicast paths which only need hostnames, multicast hops carry +// additional information for computing optimal replication points +// (RFD 488). #[derive( Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, )] @@ -46,11 +43,12 @@ impl MulticastPathHop { /// Multicast group subscription announcement propagating through DDM. /// -/// The path records the sequence of routers from the original subscriber -/// toward the current receiving router. Currently, this is used for loop -/// detection: if our router_id appears in the path, the announcement has -/// already traversed us and is dropped. The path structure also carries -/// topology information for future replication optimizations (RFD 488). +/// Contains a MulticastOrigin (overlay group + ff04::/64 underlay +/// mapping) and the path from the original subscriber outward. +// Currently, this is used for loop detection: if our router_id appears in the +// path, the announcement has already traversed us and is dropped. The path +// structure also carries topology information for future replication +// optimizations (RFD 488). #[derive( Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema, )] diff --git a/ddm/src/exchange.rs b/ddm/src/exchange.rs index 57204cc8..3bea9016 100644 --- a/ddm/src/exchange.rs +++ b/ddm/src/exchange.rs @@ -406,8 +406,11 @@ impl TunnelUpdate { /// Multicast group subscription updates. /// -/// Carries path-vector information for multicast group subscriptions, -/// enabling loop detection and optimal replication point computation. +/// Each entry carries a [`MulticastPathVector`] containing a +/// [`MulticastOrigin`] (overlay group + ff04::/64 underlay mapping) +/// and the path vector for loop detection. +/// +/// [`MulticastOrigin`]: mg_common::net::MulticastOrigin #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)] pub struct MulticastUpdate { pub announce: HashSet, @@ -1210,9 +1213,7 @@ fn handle_underlay_update(update: &UnderlayUpdate, ctx: &HandlerContext) { /// apply at this layer. The MRIB RPF module in rdb handles that check /// before routes are originated into DDM. At the DDM exchange level, /// the path vector provides loop detection and carries topology -/// information for replication optimization per [RFD 488]. -/// -/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488 +/// information for replication optimization (RFD 488). fn handle_multicast_update(update: &MulticastUpdate, ctx: &HandlerContext) { let db = &ctx.ctx.db; let hostname = &ctx.ctx.hostname; diff --git a/mg-lower/src/platform.rs b/mg-lower/src/platform.rs index 363dceaf..1d453a1e 100644 --- a/mg-lower/src/platform.rs +++ b/mg-lower/src/platform.rs @@ -217,6 +217,13 @@ pub trait Ddm { ddm_admin_client::Error, >; + /// Get multicast group subscriptions originated by this router. + /// + /// Each `MulticastOrigin` pairs an overlay group address with its + /// underlay mapping (ff04::/64) and optional source for (S,G) routes. + /// + /// Method names follow the DDM admin API convention + /// (`originated_multicast_groups`, not `originated_multicast_origins`). async fn get_originated_multicast_groups( &self, ) -> Result< @@ -224,6 +231,10 @@ pub trait Ddm { ddm_admin_client::Error, >; + /// Advertise multicast group subscriptions to DDM peers. + /// + /// Each entry is a `MulticastOrigin` pairing an overlay group + /// with its ff04::/64 underlay mapping. #[allow(clippy::ptr_arg)] async fn advertise_multicast_groups<'a>( &'a self, @@ -233,6 +244,10 @@ pub trait Ddm { ddm_admin_client::Error, >; + /// Withdraw multicast group subscriptions from DDM peers. + /// + /// Each entry is a `MulticastOrigin` pairing an overlay group + /// with its ff04::/64 underlay mapping. #[allow(clippy::ptr_arg)] async fn withdraw_multicast_groups<'a>( &'a self, @@ -484,6 +499,7 @@ pub(crate) mod test { use crate::MG_LOWER_TAG; use super::*; + use mg_common::lock; use std::sync::Mutex; use std::{collections::HashMap, net::IpAddr}; @@ -538,7 +554,7 @@ pub(crate) mod test { link_id: &LinkId, ) -> Result, DpdClientError> { - let links = self.links.lock().unwrap(); + let links = lock!(self.links); let link = links .iter() .find(|x| &x.port_id == port_id && &x.link_id == link_id); @@ -556,10 +572,7 @@ pub(crate) mod test { dpd_client::ResponseValue>, DpdClientError, > { - let result = self - .v4_routes - .lock() - .unwrap() + let result = lock!(self.v4_routes) .get(cidr) .cloned() .unwrap_or(Vec::default()); @@ -573,10 +586,7 @@ pub(crate) mod test { dpd_client::ResponseValue>, DpdClientError, > { - let result = self - .v6_routes - .lock() - .unwrap() + let result = lock!(self.v6_routes) .get(cidr) .cloned() .unwrap_or(Vec::default()); @@ -588,7 +598,7 @@ pub(crate) mod test { addr: &Ipv6Entry, ) -> Result, DpdClientError> { - self.loopback.lock().unwrap().replace(addr.clone()); + lock!(self.loopback).replace(addr.clone()); Ok(dpd_response_ok!(())) } @@ -599,7 +609,7 @@ pub(crate) mod test { dpd_client::ResponseValue>, DpdClientError, > { - let links = self.links.lock().unwrap(); + let links = lock!(self.links); let result = links .iter() .filter(|x| match filter { @@ -666,7 +676,7 @@ pub(crate) mod test { RouteTarget::V4(v4) => Route::V4(v4.clone()), RouteTarget::V6(v6) => Route::V6(v6.clone()), }; - let mut routes = self.v4_routes.lock().unwrap(); + let mut routes = lock!(self.v4_routes); match routes.get_mut(&body.cidr) { Some(targets) => { targets.push(route); @@ -683,7 +693,7 @@ pub(crate) mod test { body: &'a Ipv6RouteUpdate, ) -> Result, DpdClientError> { - let mut routes = self.v6_routes.lock().unwrap(); + let mut routes = lock!(self.v6_routes); match routes.get_mut(&body.cidr) { Some(targets) => { targets.push(body.target.clone()); @@ -703,7 +713,7 @@ pub(crate) mod test { tgt_ip: &'a IpAddr, ) -> Result, DpdClientError> { - let mut routes = self.v4_routes.lock().unwrap(); + let mut routes = lock!(self.v4_routes); if let Some(targets) = routes.get_mut(cidr) { targets.retain(|x| match (x, tgt_ip) { (Route::V4(x), IpAddr::V4(ip)) => { @@ -731,7 +741,7 @@ pub(crate) mod test { tgt_ip: &'a std::net::Ipv6Addr, ) -> Result, DpdClientError> { - let mut routes = self.v6_routes.lock().unwrap(); + let mut routes = lock!(self.v6_routes); if let Some(targets) = routes.get_mut(cidr) { targets.retain(|x| { !(x.tgt_ip == *tgt_ip @@ -773,9 +783,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue>, ddm_admin_client::Error, > { - Ok(ddm_response_ok!( - self.tunnel_originated.lock().unwrap().clone() - )) + Ok(ddm_response_ok!(lock!(self.tunnel_originated).clone())) } async fn get_originated( @@ -784,7 +792,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue>, ddm_admin_client::Error, > { - Ok(ddm_response_ok!(self.originated.lock().unwrap().clone())) + Ok(ddm_response_ok!(lock!(self.originated).clone())) } async fn advertise_prefixes<'a>( @@ -794,7 +802,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.originated.lock().unwrap().extend(body); + lock!(self.originated).extend(body); Ok(ddm_response_ok!(())) } @@ -805,7 +813,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.tunnel_originated.lock().unwrap().extend(body.clone()); + lock!(self.tunnel_originated).extend(body.clone()); Ok(ddm_response_ok!(())) } @@ -816,10 +824,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.tunnel_originated - .lock() - .unwrap() - .retain(|x| !body.contains(x)); + lock!(self.tunnel_originated).retain(|x| !body.contains(x)); Ok(ddm_response_ok!(())) } @@ -829,9 +834,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue>, ddm_admin_client::Error, > { - Ok(ddm_response_ok!( - self.multicast_originated.lock().unwrap().clone() - )) + Ok(ddm_response_ok!(lock!(self.multicast_originated).clone())) } async fn advertise_multicast_groups<'a>( @@ -841,10 +844,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.multicast_originated - .lock() - .unwrap() - .extend(body.clone()); + lock!(self.multicast_originated).extend(body.clone()); Ok(ddm_response_ok!(())) } @@ -855,10 +855,7 @@ pub(crate) mod test { ddm_admin_client::ResponseValue<()>, ddm_admin_client::Error, > { - self.multicast_originated - .lock() - .unwrap() - .retain(|x| !body.contains(x)); + lock!(self.multicast_originated).retain(|x| !body.contains(x)); Ok(ddm_response_ok!(())) } } diff --git a/openapi/ddm-admin/ddm-admin-2.0.0-3dc476.json b/openapi/ddm-admin/ddm-admin-2.0.0-bdf299.json similarity index 96% rename from openapi/ddm-admin/ddm-admin-2.0.0-3dc476.json rename to openapi/ddm-admin/ddm-admin-2.0.0-bdf299.json index 954fd3ad..aad55361 100644 --- a/openapi/ddm-admin/ddm-admin-2.0.0-3dc476.json +++ b/openapi/ddm-admin/ddm-admin-2.0.0-bdf299.json @@ -602,7 +602,7 @@ ] }, "MulticastPathHop": { - "description": "A single hop in the multicast path, carrying metadata needed for replication optimization.\n\nUnlike unicast paths which only need hostnames, multicast hops carry additional information for computing optimal replication points per [RFD 488].\n\n[RFD 488]: https://rfd.shared.oxide.computer/rfd/0488", + "description": "A single hop in the multicast path, carrying metadata needed for replication optimization.", "type": "object", "properties": { "downstream_subscriber_count": { @@ -628,7 +628,7 @@ ] }, "MulticastRoute": { - "description": "A multicast route learned via DDM.\n\nCarries both the group origin and the path vector from the originating subscriber through intermediate transit routers. The path enables loop detection and (in multi-rack topologies) replication optimizations per [RFD 488] in the future.\n\nEquality and hashing consider only `origin` and `nexthop` so that a route update with a longer path replaces the existing entry in hash-based collections.\n\n[RFD 488]: https://rfd.shared.oxide.computer/rfd/0488", + "description": "A multicast route learned via DDM.\n\nCarries a MulticastOrigin (overlay group + ff04::/64 underlay mapping) and the path vector from the originating subscriber through intermediate transit routers.", "type": "object", "properties": { "nexthop": { diff --git a/openapi/ddm-admin/ddm-admin-latest.json b/openapi/ddm-admin/ddm-admin-latest.json index 39659731..ba4534e5 120000 --- a/openapi/ddm-admin/ddm-admin-latest.json +++ b/openapi/ddm-admin/ddm-admin-latest.json @@ -1 +1 @@ -ddm-admin-2.0.0-3dc476.json \ No newline at end of file +ddm-admin-2.0.0-bdf299.json \ No newline at end of file diff --git a/openapi/mg-admin/mg-admin-8.0.0-fc8d9c.json b/openapi/mg-admin/mg-admin-8.0.0-082a16.json similarity index 99% rename from openapi/mg-admin/mg-admin-8.0.0-fc8d9c.json rename to openapi/mg-admin/mg-admin-8.0.0-082a16.json index b264b109..77f5d427 100644 --- a/openapi/mg-admin/mg-admin-8.0.0-fc8d9c.json +++ b/openapi/mg-admin/mg-admin-8.0.0-082a16.json @@ -5440,7 +5440,7 @@ ] }, "PeerId": { - "description": "Identifies a BGP peer for session management and route tracking.\n\nBGP peers can be identified in two ways: - **Numbered**: Traditional BGP peering using explicit IP addresses - **Unnumbered**: Modern peering using interface names with link-local addresses\n\n# Unnumbered Peering\n\nUnnumbered BGP uses interface names as stable identifiers instead of IP addresses. This is important because: - Link-local IPv6 addresses are discovered dynamically via NDP - Multiple interfaces may have peers with the same link-local address (e.g., fe80::1 on eth0 and fe80::1 on eth1) - Scope ID (interface index) disambiguates link-local addresses, but is not stable across reboots - Interface names provide stable, unambiguous peer identification\n\n# Route Tracking\n\nThis type is used in [`BgpPathProperties`](crate::BgpPathProperties) to track which peer advertised a route. Using `PeerId` instead of `IpAddr` ensures: - Unnumbered peers are properly distinguished even if they share link-local IPs - Route cleanup correctly removes only the routes from the intended peer - No cross-contamination when multiple unnumbered sessions exist\n\n# Examples\n\n``` use rdb_types::PeerId; use std::net::IpAddr;\n\n// Numbered peer let numbered = PeerId::Ip(\"192.0.2.1\".parse::().unwrap());\n\n// Unnumbered peer let unnumbered = PeerId::Interface(\"eth0\".to_string()); ```", + "description": "Identifies a BGP peer for session management and route tracking.\n\nBGP peers can be identified in two ways: - **Numbered**: Traditional BGP peering using explicit IP addresses - **Unnumbered**: Modern peering using interface names with link-local addresses\n\n# Unnumbered Peering\n\nUnnumbered BGP uses interface names as stable identifiers instead of IP addresses. This is important because: - Link-local IPv6 addresses are discovered dynamically via NDP - Multiple interfaces may have peers with the same link-local address (e.g., fe80::1 on eth0 and fe80::1 on eth1) - Scope ID (interface index) disambiguates link-local addresses, but is not stable across reboots - Interface names provide stable, unambiguous peer identification\n\n# Route Tracking\n\nThis type is used in `BgpPathProperties` to track which peer advertised a route. Using `PeerId` instead of `IpAddr` ensures: - Unnumbered peers are properly distinguished even if they share link-local IPs - Route cleanup correctly removes only the routes from the intended peer - No cross-contamination when multiple unnumbered sessions exist\n\n# Examples\n\n``` use rdb_types::PeerId; use std::net::IpAddr;\n\n// Numbered peer let numbered = PeerId::Ip(\"192.0.2.1\".parse::().unwrap());\n\n// Unnumbered peer let unnumbered = PeerId::Interface(\"eth0\".to_string()); ```", "oneOf": [ { "description": "Numbered peer identified by IP address\n\nUsed for traditional BGP sessions where peers are configured with explicit IP addresses (either IPv4 or IPv6 global unicast).", diff --git a/openapi/mg-admin/mg-admin-latest.json b/openapi/mg-admin/mg-admin-latest.json index 329966e4..38b6e356 120000 --- a/openapi/mg-admin/mg-admin-latest.json +++ b/openapi/mg-admin/mg-admin-latest.json @@ -1 +1 @@ -mg-admin-8.0.0-fc8d9c.json \ No newline at end of file +mg-admin-8.0.0-082a16.json \ No newline at end of file diff --git a/rdb-types/src/lib.rs b/rdb-types/src/lib.rs index af365e51..44a3777c 100644 --- a/rdb-types/src/lib.rs +++ b/rdb-types/src/lib.rs @@ -459,8 +459,8 @@ pub enum ProtocolFilter { /// /// # Route Tracking /// -/// This type is used in [`BgpPathProperties`](crate::BgpPathProperties) to track -/// which peer advertised a route. Using `PeerId` instead of `IpAddr` ensures: +/// This type is used in `BgpPathProperties` to track which peer advertised a +/// route. Using `PeerId` instead of `IpAddr` ensures: /// - Unnumbered peers are properly distinguished even if they share link-local IPs /// - Route cleanup correctly removes only the routes from the intended peer /// - No cross-contamination when multiple unnumbered sessions exist From c2151c6c5680cc369c94fdfbd07f489a3d5d8edb Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 8 Apr 2026 02:33:51 +0000 Subject: [PATCH 3/4] [review+arch-change] address PR review + revisit M2P/OPTE handling This addresses review feedback on DDM multicast exchange PR (#696). Additionally, oriented towards the goal of Omicron owning all OPTE M2P (multicast-to-physical) mappings via the sled-agent, we remove direct OPTE M2P writes from DDM. This was an oversight by me using similar patterns to tunnel routing. The M2P table is global to xde, so having both DDM and Omicron's reconciler write to it creates a conflict risk where Nexus could reap DDM-written entries as stale. Our original intention has always been Omicron driving most things (-> Dpd/Dendrite, -> OPTE) due to the implicit and dynamic lifecycle of multicast groups. The work here is for DDM to distribute multicast membership and expose learned state via its admin API for Omicron to consume (and rely on for port knowledge), which is currently a TODO that will be removed in Omicron once this work is plumbed up. Other changes: - Replace String errors with UnderlayMulticastError in mg-common - Use route key VNI instead of hard-coded DEFAULT_MULTICAST_VNI - Sort ddmadm multicast output by overlay group for deterministic display - Consolidate oxide_vpc imports in sys.rs - Derive Eq on MulticastOrigin, document PartialEq/Hash exclusion of metric with #649 reference - Downgrade multicast withdrawal success log to debug - Fix MRIB diagram indentation - Remove "old" zl/mrib rdb-types UnderlayMulticastIpv6 and use mg-common one - Remove unnecessary DEFAULT_MULTICAST_VNI constant --- Cargo.lock | 3 + ddm-admin-client/Cargo.toml | 1 + ddm-admin-client/src/lib.rs | 26 +++++ ddm/src/exchange.rs | 71 ++++-------- ddm/src/lib.rs | 7 ++ ddm/src/sm.rs | 40 +++---- ddm/src/sys.rs | 95 +--------------- ddmadm/src/main.rs | 21 +++- mg-common/Cargo.toml | 1 + mg-common/src/net.rs | 53 +++++---- mg-lower/src/ddm.rs | 2 +- mg-lower/src/mrib.rs | 46 ++------ mg-types/versions/Cargo.toml | 1 + .../versions/src/multicast_support/mrib.rs | 5 +- mgadm/src/mrib.rs | 4 +- ...df299.json => ddm-admin-2.0.0-5318c9.json} | 14 ++- openapi/ddm-admin/ddm-admin-latest.json | 2 +- rdb/src/db.rs | 8 +- rdb/src/types.rs | 106 +++--------------- 19 files changed, 177 insertions(+), 329 deletions(-) rename openapi/ddm-admin/{ddm-admin-2.0.0-bdf299.json => ddm-admin-2.0.0-5318c9.json} (98%) diff --git a/Cargo.lock b/Cargo.lock index f8c9e9e8..81237828 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1349,6 +1349,7 @@ dependencies = [ name = "ddm-admin-client" version = "0.1.0" dependencies = [ + "mg-common", "oxnet", "progenitor 0.13.0", "reqwest 0.13.2", @@ -3803,6 +3804,7 @@ dependencies = [ "slog-async", "slog-bunyan", "smf 0.10.0 (git+https://github.com/illumos/smf-rs?branch=main)", + "thiserror 2.0.18", "uuid", ] @@ -3878,6 +3880,7 @@ version = "0.1.0" dependencies = [ "bfd", "bgp", + "mg-common", "rdb", "schemars 0.8.22", "serde", diff --git a/ddm-admin-client/Cargo.toml b/ddm-admin-client/Cargo.toml index 24d50602..88e58187 100644 --- a/ddm-admin-client/Cargo.toml +++ b/ddm-admin-client/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" ignored = ["oxnet", "serde", "uuid"] [dependencies] +mg-common.workspace = true oxnet.workspace = true progenitor.workspace = true reqwest.workspace = true diff --git a/ddm-admin-client/src/lib.rs b/ddm-admin-client/src/lib.rs index 0d22065f..bea6d028 100644 --- a/ddm-admin-client/src/lib.rs +++ b/ddm-admin-client/src/lib.rs @@ -40,6 +40,20 @@ impl std::hash::Hash for types::TunnelOrigin { } } +impl std::cmp::PartialEq for types::Vni { + fn eq(&self, other: &Self) -> bool { + self.0.eq(&other.0) + } +} + +impl std::cmp::Eq for types::Vni {} + +impl std::hash::Hash for types::Vni { + fn hash(&self, state: &mut H) { + self.0.hash(state); + } +} + impl std::cmp::PartialEq for types::MulticastOrigin { fn eq(&self, other: &Self) -> bool { self.overlay_group.eq(&other.overlay_group) @@ -61,3 +75,15 @@ impl std::hash::Hash for types::MulticastOrigin { self.source.hash(state); } } + +impl From for types::MulticastOrigin { + fn from(o: mg_common::net::MulticastOrigin) -> Self { + Self { + overlay_group: o.overlay_group, + underlay_group: o.underlay_group.ip(), + vni: types::Vni(o.vni.as_u32()), + metric: o.metric, + source: o.source, + } + } +} diff --git a/ddm/src/exchange.rs b/ddm/src/exchange.rs index 3bea9016..9e696e37 100644 --- a/ddm/src/exchange.rs +++ b/ddm/src/exchange.rs @@ -977,10 +977,6 @@ fn collect_multicast( Ok(multicast) } -fn opt(s: HashSet) -> Option> { - if s.is_empty() { None } else { Some(s) } -} - #[endpoint { method = GET, path = "/v3/pull" }] async fn pull_handler_v3( ctx: RequestContext>>, @@ -988,8 +984,8 @@ async fn pull_handler_v3( let ctx = ctx.context().lock().await.clone(); let (underlay, tunnel) = collect_underlay_tunnel(&ctx)?; Ok(HttpResponseOk(PullResponseV3 { - underlay: opt(underlay), - tunnel: opt(tunnel), + underlay: crate::non_empty(underlay), + tunnel: crate::non_empty(tunnel), })) } @@ -1001,9 +997,9 @@ async fn pull_handler_v4( let (underlay, tunnel) = collect_underlay_tunnel(&ctx)?; let multicast = collect_multicast(&ctx)?; Ok(HttpResponseOk(PullResponse { - underlay: opt(underlay), - tunnel: opt(tunnel), - multicast: opt(multicast), + underlay: crate::non_empty(underlay), + tunnel: crate::non_empty(tunnel), + multicast: crate::non_empty(multicast), })) } @@ -1253,48 +1249,29 @@ fn handle_multicast_update(update: &MulticastUpdate, ctx: &HandlerContext) { } // Atomic import + delete + diff under a single lock. - let (to_add, to_del) = db.update_imported_mcast(&import, &remove); - - if let Err(e) = crate::sys::add_multicast_routes( - &ctx.log, - &ctx.ctx.config.if_name, - &to_add, - ) { - err!( - ctx.log, - ctx.ctx.config.if_name, - "add multicast routes: {e}: {to_add:#?}", - ) - } - - if let Err(e) = crate::sys::remove_multicast_routes( - &ctx.log, - &ctx.ctx.config.if_name, - &to_del, - ) { - err!( - ctx.log, - ctx.ctx.config.if_name, - "remove multicast routes: {e}: {to_del:#?}", - ) - } + // + // DDM stores learned multicast state, which feeds back into Omicron, as + // the latter owns OPTE M2P programming via sled-agent (the M2P table is + // global to xde). + // Learned state is queryable via the DDM admin API (get_multicast_groups). + db.update_imported_mcast(&import, &remove); } #[cfg(test)] mod test { use super::*; use ddm_types::exchange::MulticastPathHop; - use mg_common::net::{MulticastOrigin, UnderlayMulticastIpv6}; + use mg_common::net::{MulticastOrigin, UnderlayMulticastIpv6, Vni}; use std::net::Ipv6Addr; fn sample_multicast_update() -> MulticastUpdate { let origin = MulticastOrigin { overlay_group: "233.252.0.1".parse().unwrap(), - underlay_group: UnderlayMulticastIpv6::new(Ipv6Addr::new( - 0xff04, 0, 0, 0, 0, 0, 0, 1, - )) + underlay_group: UnderlayMulticastIpv6::new( + "ff04::1".parse().unwrap(), + ) .unwrap(), - vni: 77, + vni: Vni::try_from(77u32).unwrap(), metric: 0, source: None, }; @@ -1352,11 +1329,11 @@ mod test { fn v4_pull_response_round_trips() { let origin = MulticastOrigin { overlay_group: "ff0e::1".parse().unwrap(), - underlay_group: UnderlayMulticastIpv6::new(Ipv6Addr::new( - 0xff04, 0, 0, 0, 0, 0, 0, 2, - )) + underlay_group: UnderlayMulticastIpv6::new( + "ff04::2".parse().unwrap(), + ) .unwrap(), - vni: 77, + vni: Vni::try_from(77u32).unwrap(), metric: 0, source: None, }; @@ -1378,11 +1355,11 @@ mod test { fn v4_pull_response_deserializes_as_v3() { let origin = MulticastOrigin { overlay_group: "233.252.0.1".parse().unwrap(), - underlay_group: UnderlayMulticastIpv6::new(Ipv6Addr::new( - 0xff04, 0, 0, 0, 0, 0, 0, 1, - )) + underlay_group: UnderlayMulticastIpv6::new( + "ff04::1".parse().unwrap(), + ) .unwrap(), - vni: 77, + vni: Vni::try_from(77u32).unwrap(), metric: 0, source: None, }; diff --git a/ddm/src/lib.rs b/ddm/src/lib.rs index 447109ba..7699ce43 100644 --- a/ddm/src/lib.rs +++ b/ddm/src/lib.rs @@ -11,6 +11,13 @@ pub mod sm; pub mod sys; mod util; +/// Returns `None` if the set is empty, otherwise `Some(s)`. +pub(crate) fn non_empty( + set: std::collections::HashSet, +) -> Option> { + (!set.is_empty()).then_some(set) +} + #[macro_export] macro_rules! err { ($log:expr, $index:expr, $($args:tt)+) => { diff --git a/ddm/src/sm.rs b/ddm/src/sm.rs index 44e6c916..604b9080 100644 --- a/ddm/src/sm.rs +++ b/ddm/src/sm.rs @@ -494,12 +494,9 @@ impl Exchange { self.ctx.event_channels.len() ); - let underlay = if to_remove.is_empty() { - None - } else { - Some(UnderlayUpdate::withdraw( - to_remove - .iter() + let underlay = crate::non_empty(to_remove).map(|set| { + UnderlayUpdate::withdraw( + set.iter() .map(|x| PathVector { destination: x.destination, path: { @@ -509,35 +506,30 @@ impl Exchange { }, }) .collect(), - )) - }; + ) + }); - let tunnel = if to_remove_tnl.is_empty() { - None - } else { - Some(TunnelUpdate::withdraw( - to_remove_tnl.iter().cloned().map(Into::into).collect(), - )) - }; + let tunnel = crate::non_empty(to_remove_tnl).map(|set| { + TunnelUpdate::withdraw( + set.iter().cloned().map(Into::into).collect(), + ) + }); - // Build multicast withdrawal with our hop info - let multicast = if to_remove_mcast.is_empty() { - None - } else { + // Build multicast withdrawal with our hop info. + let multicast = crate::non_empty(to_remove_mcast).map(|set| { let hop = MulticastPathHop::new( self.ctx.hostname.clone(), self.ctx.config.addr, ); - Some(MulticastUpdate::withdraw( - to_remove_mcast - .iter() + MulticastUpdate::withdraw( + set.iter() .map(|route| ddm_types::exchange::MulticastPathVector { origin: route.origin.clone(), path: vec![hop.clone()], }) .collect(), - )) - }; + ) + }); let push = Arc::new(Update { underlay, diff --git a/ddm/src/sys.rs b/ddm/src/sys.rs index 22b782f0..915fc0b5 100644 --- a/ddm/src/sys.rs +++ b/ddm/src/sys.rs @@ -4,7 +4,7 @@ use crate::sm::{Config, DpdConfig}; use crate::{dbg, err, inf, wrn}; -use ddm_types::db::{MulticastRoute, TunnelRoute}; +use ddm_types::db::TunnelRoute; use dpd_client::Client; use dpd_client::ClientState; use dpd_client::types; @@ -359,99 +359,6 @@ pub fn remove_tunnel_routes( Ok(()) } -#[cfg(not(target_os = "illumos"))] -pub fn add_multicast_routes( - _log: &Logger, - _ifname: &str, - _routes: &HashSet, -) -> Result<(), String> { - todo!(); -} - -/// Update OPTE multicast-to-physical (M2P) table entries for learned -/// multicast routes. Each route's overlay group is mapped to the -/// corresponding underlay multicast address so that OPTE can direct -/// multicast traffic to the correct underlay destinations. -#[cfg(target_os = "illumos")] -pub fn add_multicast_routes( - log: &Logger, - ifname: &str, - routes: &HashSet, -) -> Result<(), String> { - use oxide_vpc::api::MulticastUnderlay; - use oxide_vpc::api::SetMcast2PhysReq; - - let hdl = OpteHdl::open().map_err(|e| e.to_string())?; - - for route in routes { - let underlay = - MulticastUnderlay::new(route.origin.underlay_group.ip().into()) - .map_err(|e| { - format!( - "invalid underlay multicast address {}: {e}", - route.origin.underlay_group, - ) - })?; - let req = SetMcast2PhysReq { - group: route.origin.overlay_group.into(), - underlay, - }; - let overlay = route.origin.overlay_group; - let underlay_addr = route.origin.underlay_group; - inf!(log, ifname, "adding M2P: {overlay:?} -> {underlay_addr}"); - if let Err(e) = hdl.set_m2p(&req) { - err!(log, ifname, "failed to set M2P route: {req:?}: {e}"); - } - } - - Ok(()) -} - -#[cfg(not(target_os = "illumos"))] -pub fn remove_multicast_routes( - _log: &Logger, - _ifname: &str, - _routes: &HashSet, -) -> Result<(), String> { - todo!() -} - -/// Remove OPTE M2P table entries for withdrawn multicast routes. -#[cfg(target_os = "illumos")] -pub fn remove_multicast_routes( - log: &Logger, - ifname: &str, - routes: &HashSet, -) -> Result<(), String> { - use oxide_vpc::api::ClearMcast2PhysReq; - use oxide_vpc::api::MulticastUnderlay; - - let hdl = OpteHdl::open().map_err(|e| e.to_string())?; - - for route in routes { - let underlay = - MulticastUnderlay::new(route.origin.underlay_group.ip().into()) - .map_err(|e| { - format!( - "invalid underlay multicast address {}: {e}", - route.origin.underlay_group, - ) - })?; - let req = ClearMcast2PhysReq { - group: route.origin.overlay_group.into(), - underlay, - }; - let overlay = route.origin.overlay_group; - let underlay_addr = route.origin.underlay_group; - inf!(log, ifname, "removing M2P: {overlay:?} -> {underlay_addr}"); - if let Err(e) = hdl.clear_m2p(&req) { - err!(log, ifname, "failed to clear M2P route: {req:?}: {e}"); - } - } - - Ok(()) -} - pub fn remove_underlay_routes( log: &Logger, ifname: &str, diff --git a/ddmadm/src/main.rs b/ddmadm/src/main.rs index 309b8a6f..08ffe515 100644 --- a/ddmadm/src/main.rs +++ b/ddmadm/src/main.rs @@ -279,6 +279,13 @@ async fn run() -> Result<()> { } SubCommand::MulticastImported => { let msg = client.get_multicast_groups().await?; + let mut routes: Vec<_> = msg.into_inner().into_iter().collect(); + routes.sort_by(|a, b| { + a.origin + .overlay_group + .cmp(&b.origin.overlay_group) + .then_with(|| a.origin.source.cmp(&b.origin.source)) + }); let mut tw = TabWriter::new(stdout()); writeln!( &mut tw, @@ -290,7 +297,7 @@ async fn run() -> Result<()> { "Source".dimmed(), "Path".dimmed(), )?; - for route in msg.into_inner() { + for route in &routes { let source = match &route.origin.source { Some(s) => s.to_string(), None => "(*,G)".to_string(), @@ -316,6 +323,12 @@ async fn run() -> Result<()> { } SubCommand::MulticastOriginated => { let msg = client.get_originated_multicast_groups().await?; + let mut origins: Vec<_> = msg.into_inner().into_iter().collect(); + origins.sort_by(|a, b| { + a.overlay_group + .cmp(&b.overlay_group) + .then_with(|| a.source.cmp(&b.source)) + }); let mut tw = TabWriter::new(stdout()); writeln!( &mut tw, @@ -326,7 +339,7 @@ async fn run() -> Result<()> { "Metric".dimmed(), "Source".dimmed(), )?; - for origin in msg.into_inner() { + for origin in &origins { let source = match &origin.source { Some(s) => s.to_string(), None => "(*,G)".to_string(), @@ -348,7 +361,7 @@ async fn run() -> Result<()> { .advertise_multicast_groups(&vec![types::MulticastOrigin { overlay_group: mg.overlay_group, underlay_group: mg.underlay_group, - vni: mg.vni, + vni: types::Vni(mg.vni), metric: mg.metric, source: mg.source, }]) @@ -359,7 +372,7 @@ async fn run() -> Result<()> { .withdraw_multicast_groups(&vec![types::MulticastOrigin { overlay_group: mg.overlay_group, underlay_group: mg.underlay_group, - vni: mg.vni, + vni: types::Vni(mg.vni), metric: mg.metric, source: mg.source, }]) diff --git a/mg-common/Cargo.toml b/mg-common/Cargo.toml index 0c40e317..657d5556 100644 --- a/mg-common/Cargo.toml +++ b/mg-common/Cargo.toml @@ -12,6 +12,7 @@ schemars.workspace = true slog.workspace = true slog-bunyan.workspace = true slog-async.workspace = true +thiserror.workspace = true oximeter-producer.workspace = true oximeter.workspace = true oxnet.workspace = true diff --git a/mg-common/src/net.rs b/mg-common/src/net.rs index bce080b2..8aaa69b7 100644 --- a/mg-common/src/net.rs +++ b/mg-common/src/net.rs @@ -2,20 +2,35 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +// Re-export so consumers of MulticastOrigin.vni don't need a direct +// omicron_common dependency. +pub use omicron_common::api::external::Vni; + use omicron_common::address::UNDERLAY_MULTICAST_SUBNET; -use omicron_common::api::external::Vni; use oxnet::{IpNet, Ipv4Net, Ipv6Net}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::fmt; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; +use thiserror::Error; -/// Default VNI for multicast routing. -pub const DEFAULT_MULTICAST_VNI: u32 = Vni::DEFAULT_MULTICAST_VNI.as_u32(); +fn default_multicast_vni() -> Vni { + Vni::DEFAULT_MULTICAST_VNI +} -fn default_multicast_vni() -> u32 { - DEFAULT_MULTICAST_VNI +/// Error constructing an [`UnderlayMulticastIpv6`] address. +#[derive(Debug, Clone, Error)] +pub enum UnderlayMulticastError { + /// The address is not within the underlay multicast subnet (ff04::/64). + #[error( + "underlay address {addr} is not within {UNDERLAY_MULTICAST_SUBNET}" + )] + NotInSubnet { addr: Ipv6Addr }, + + /// The string could not be parsed as an IPv6 address. + #[error("invalid IPv6 address: {0}")] + InvalidIpv6(#[from] std::net::AddrParseError), } /// A validated underlay multicast IPv6 address within ff04::/64. @@ -45,13 +60,11 @@ impl UnderlayMulticastIpv6 { /// /// # Errors /// - /// Returns an error if the address is not within ff04::/64. - pub fn new(value: Ipv6Addr) -> Result { + /// Returns [`UnderlayMulticastError::NotInSubnet`] if the address is + /// not within ff04::/64. + pub fn new(value: Ipv6Addr) -> Result { if !UNDERLAY_MULTICAST_SUBNET.contains(value) { - return Err(format!( - "underlay address {value} is not within \ - {UNDERLAY_MULTICAST_SUBNET}" - )); + return Err(UnderlayMulticastError::NotInSubnet { addr: value }); } Ok(Self(value)) } @@ -70,7 +83,7 @@ impl fmt::Display for UnderlayMulticastIpv6 { } impl TryFrom for UnderlayMulticastIpv6 { - type Error = String; + type Error = UnderlayMulticastError; fn try_from(value: Ipv6Addr) -> Result { Self::new(value) @@ -90,11 +103,10 @@ impl From for IpAddr { } impl FromStr for UnderlayMulticastIpv6 { - type Err = String; + type Err = UnderlayMulticastError; fn from_str(s: &str) -> Result { - let addr: Ipv6Addr = - s.parse().map_err(|e| format!("invalid IPv6: {e}"))?; + let addr: Ipv6Addr = s.parse()?; Self::new(addr) } } @@ -194,7 +206,7 @@ pub enum IpPrefix { /// advertised via DDM. The overlay_group is the application-visible multicast /// address (e.g., 233.252.0.1 or ff0e::1), while underlay_group is the mapped /// admin-local scoped IPv6 address (ff04::X) used in the underlay network. -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[derive(Debug, Clone, Eq, Serialize, Deserialize, JsonSchema)] pub struct MulticastOrigin { /// The overlay multicast group address (IPv4 or IPv6). /// This is the group address visible to applications. @@ -206,7 +218,7 @@ pub struct MulticastOrigin { /// VNI for this multicast group (identifies the VPC/network context). #[serde(default = "default_multicast_vni")] - pub vni: u32, + pub vni: Vni, /// Metric for path selection (lower is better). /// @@ -222,6 +234,11 @@ pub struct MulticastOrigin { pub source: Option, } +// Equality and hashing consider only the identity fields (overlay_group, +// underlay_group, vni, source), not metric. This allows metric updates to +// replace existing entries in HashSet-based collections without creating +// duplicates. This type is not used in ordered collections (BTreeSet). +// See #649 for why adding Ord here would require more care. impl PartialEq for MulticastOrigin { fn eq(&self, other: &Self) -> bool { self.overlay_group == other.overlay_group @@ -231,8 +248,6 @@ impl PartialEq for MulticastOrigin { } } -impl Eq for MulticastOrigin {} - impl std::hash::Hash for MulticastOrigin { fn hash(&self, state: &mut H) { self.overlay_group.hash(state); diff --git a/mg-lower/src/ddm.rs b/mg-lower/src/ddm.rs index 45a53023..2f987477 100644 --- a/mg-lower/src/ddm.rs +++ b/mg-lower/src/ddm.rs @@ -159,7 +159,7 @@ pub(crate) fn remove_multicast_routes< "groups" => format!("{routes:#?}") ), Ok(_) => ddm_log!(log, - info, + debug, "withdrew multicast groups"; "groups" => format!("{routes:#?}") ), diff --git a/mg-lower/src/mrib.rs b/mg-lower/src/mrib.rs index ea8069e9..8645bc8e 100644 --- a/mg-lower/src/mrib.rs +++ b/mg-lower/src/mrib.rs @@ -10,7 +10,7 @@ //! ## Data Flow //! //! ```text -//! MRIB (loc_mrib changes) +//! MRIB (loc_mrib changes) //! | //! v [MribChangeNotification] //! mg-lower/mrib.rs @@ -27,15 +27,11 @@ use crate::ddm::{ }; use crate::platform::{Ddm, ProductionDdm}; use ddm_admin_client::types::MulticastOrigin; -use mg_common::net::DEFAULT_MULTICAST_VNI; +use mg_common::net::Vni; use rdb::Mrib; -use rdb::types::{ - DEFAULT_MULTICAST_VNI as DEFAULT_MCAST_VNI, MribChangeNotification, - MulticastAddr, MulticastRoute, -}; +use rdb::types::{MribChangeNotification, MulticastAddr, MulticastRoute}; use slog::{Logger, debug, error, info}; use std::collections::HashSet; -use std::net::IpAddr; use std::sync::Arc; use std::sync::mpsc::{RecvTimeoutError, channel}; use std::thread::sleep; @@ -43,33 +39,11 @@ use std::time::Duration; const MG_LOWER_MRIB_TAG: &str = "mg-lower-mrib"; -/// Convert an MRIB MulticastRoute to a DDM MulticastOrigin. +/// Convert an MRIB [`MulticastRoute`] to a DDM [`MulticastOrigin`]. /// -/// The MulticastOrigin captures the essential information needed for DDM -/// to advertise the multicast group subscription to other routers: -/// - overlay_group: The multicast group address (e.g., 233.252.0.1 or ff0e::1) -/// - underlay_group: The mapped underlay address within the ff04::/64 subnet -/// (a /64 within admin-local scope per RFC 7346, reserved for the rack's -/// underlay multicast traffic) -/// - source: Optional source for (S,G) routes, None for (*,G) -/// - vni: Virtual Network Identifier (default multicast VNI) -fn mrib_route_to_ddm_origin(route: &MulticastRoute) -> MulticastOrigin { - // Extract overlay group address from the route key - let overlay_group: IpAddr = match route.key.group() { - MulticastAddr::V4(v4) => IpAddr::V4(v4.ip()), - MulticastAddr::V6(v6) => IpAddr::V6(v6.ip()), - }; - - // Extract source address for (S,G) routes - let source = route.key.source(); - - MulticastOrigin { - overlay_group, - underlay_group: route.underlay_group.into(), - vni: DEFAULT_MULTICAST_VNI, - metric: 0, // Default metric - source, - } +/// [`MulticastOrigin`]: ddm_admin_client::types::MulticastOrigin +fn ddm_origin(route: &MulticastRoute) -> MulticastOrigin { + mg_common::net::MulticastOrigin::from(route).into() } /// Run the MRIB synchronization loop. @@ -135,7 +109,7 @@ pub(crate) fn full_sync( // Convert to DDM MulticastOrigin set let mrib_origins: HashSet = - mrib_routes.values().map(mrib_route_to_ddm_origin).collect(); + mrib_routes.values().map(ddm_origin).collect(); // Get current DDM advertised state let ddm_current: HashSet = rt @@ -192,7 +166,7 @@ fn handle_change( for key in notification.changed { // Check if route exists in loc_mrib (installed) if let Some(route) = mrib.get_selected_route(&key) { - let origin = mrib_route_to_ddm_origin(&route); + let origin = ddm_origin(&route); if !ddm_current.contains(&origin) { to_add.push(origin); } @@ -206,7 +180,7 @@ fn handle_change( && let Ok(ddm_key) = rdb::types::MulticastRouteKey::new( ddm_origin.source, overlay_group, - DEFAULT_MCAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ) && ddm_key == key { diff --git a/mg-types/versions/Cargo.toml b/mg-types/versions/Cargo.toml index 854e12c1..254b82e9 100644 --- a/mg-types/versions/Cargo.toml +++ b/mg-types/versions/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] bfd.workspace = true bgp.workspace = true +mg-common.workspace = true rdb.workspace = true schemars.workspace = true serde.workspace = true diff --git a/mg-types/versions/src/multicast_support/mrib.rs b/mg-types/versions/src/multicast_support/mrib.rs index e6779b3a..7e4e4f88 100644 --- a/mg-types/versions/src/multicast_support/mrib.rs +++ b/mg-types/versions/src/multicast_support/mrib.rs @@ -8,9 +8,8 @@ use std::net::IpAddr; -use rdb::types::{ - AddressFamily, MulticastRouteKey, UnderlayMulticastIpv6, Vni, -}; +use mg_common::net::UnderlayMulticastIpv6; +use rdb::types::{AddressFamily, MulticastRouteKey, Vni}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/mgadm/src/mrib.rs b/mgadm/src/mrib.rs index 402421d2..ab4548c5 100644 --- a/mgadm/src/mrib.rs +++ b/mgadm/src/mrib.rs @@ -21,9 +21,9 @@ use mg_admin_client::types::{ MribRpfRebuildIntervalRequest, MulticastRoute, MulticastRouteKey, RouteOriginFilter, Vni, }; -use rdb::types::{AddressFamily, DEFAULT_MULTICAST_VNI}; +use rdb::types::AddressFamily; -const DEFAULT_VNI: u32 = DEFAULT_MULTICAST_VNI.as_u32(); +const DEFAULT_VNI: u32 = rdb::Vni::DEFAULT_MULTICAST_VNI.as_u32(); fn parse_route_origin(s: &str) -> Result { match s.to_lowercase().as_str() { diff --git a/openapi/ddm-admin/ddm-admin-2.0.0-bdf299.json b/openapi/ddm-admin/ddm-admin-2.0.0-5318c9.json similarity index 98% rename from openapi/ddm-admin/ddm-admin-2.0.0-bdf299.json rename to openapi/ddm-admin/ddm-admin-2.0.0-5318c9.json index aad55361..4f9ddae9 100644 --- a/openapi/ddm-admin/ddm-admin-2.0.0-bdf299.json +++ b/openapi/ddm-admin/ddm-admin-2.0.0-5318c9.json @@ -591,9 +591,11 @@ "vni": { "description": "VNI for this multicast group (identifies the VPC/network context).", "default": 77, - "type": "integer", - "format": "uint32", - "minimum": 0 + "allOf": [ + { + "$ref": "#/components/schemas/Vni" + } + ] } }, "required": [ @@ -758,6 +760,12 @@ "nexthop", "origin" ] + }, + "Vni": { + "description": "A Geneve Virtual Network Identifier", + "type": "integer", + "format": "uint32", + "minimum": 0 } }, "responses": { diff --git a/openapi/ddm-admin/ddm-admin-latest.json b/openapi/ddm-admin/ddm-admin-latest.json index ba4534e5..a2528890 120000 --- a/openapi/ddm-admin/ddm-admin-latest.json +++ b/openapi/ddm-admin/ddm-admin-latest.json @@ -1 +1 @@ -ddm-admin-2.0.0-bdf299.json \ No newline at end of file +ddm-admin-2.0.0-5318c9.json \ No newline at end of file diff --git a/rdb/src/db.rs b/rdb/src/db.rs index e4e52f03..15dfa39e 100644 --- a/rdb/src/db.rs +++ b/rdb/src/db.rs @@ -2020,14 +2020,14 @@ impl Reaper { #[cfg(test)] mod test { use crate::{ - AddressFamily, DEFAULT_MULTICAST_VNI, DEFAULT_RIB_PRIORITY_STATIC, - Path, Prefix, Prefix4, Prefix6, StaticRouteKey, + AddressFamily, DEFAULT_RIB_PRIORITY_STATIC, Path, Prefix, Prefix4, + Prefix6, StaticRouteKey, db::Db, test::{TEST_WAIT_ITERATIONS, TestDb}, types::{ MulticastAddr, MulticastAddrV4, MulticastAddrV6, MulticastRoute, MulticastRouteKey, MulticastSourceProtocol, PrefixDbKey, - UnderlayMulticastIpv6, UnicastAddrV4, UnicastAddrV6, + UnderlayMulticastIpv6, UnicastAddrV4, UnicastAddrV6, Vni, test_helpers::path_vecs_equal, }, }; @@ -2385,7 +2385,7 @@ mod test { let key = MulticastRouteKey::new( Some(s_ip), group, - DEFAULT_MULTICAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ) .expect("AF match"); let route = MulticastRoute::new( diff --git a/rdb/src/types.rs b/rdb/src/types.rs index 3a68b562..c7255935 100644 --- a/rdb/src/types.rs +++ b/rdb/src/types.rs @@ -794,9 +794,6 @@ impl Display for PrefixChangeNotification { // MRIB (Multicast RIB) Types // ============================================================================ -/// Default VNI for fleet-wide multicast routing. -pub const DEFAULT_MULTICAST_VNI: Vni = Vni::DEFAULT_MULTICAST_VNI; - /// A validated IPv4 unicast address suitable for multicast source fields. /// /// This rejects addresses that cannot appear as a forwarded unicast source: @@ -1112,92 +1109,7 @@ impl From for Ipv6Addr { } } -/// A validated underlay multicast IPv6 address within ff04::/64. -/// -/// The Oxide rack maps overlay multicast groups 1:1 to admin-local scoped -/// IPv6 multicast addresses in `UNDERLAY_MULTICAST_SUBNET` (ff04::/64). -/// This type enforces that invariant at construction time. -/// -// TODO: This duplicates `dpd_types::mcast::UnderlayMulticastIpv6` in dendrite. -// Both should be consolidated into `omicron_common` so maghemite, dendrite, -// and omicron share a single definition. -#[derive( - Debug, - Copy, - Clone, - Eq, - PartialEq, - PartialOrd, - Ord, - Hash, - Serialize, - Deserialize, - JsonSchema, -)] -#[serde(try_from = "Ipv6Addr", into = "Ipv6Addr")] -#[schemars(transparent)] -pub struct UnderlayMulticastIpv6(Ipv6Addr); - -impl UnderlayMulticastIpv6 { - /// Create a new validated underlay multicast address. - /// - /// # Errors - /// - /// Returns an error if the address is not within `UNDERLAY_MULTICAST_SUBNET` - /// (ff04::/64). - pub fn new(value: Ipv6Addr) -> Result { - if !UNDERLAY_MULTICAST_SUBNET.contains(value) { - return Err(Error::Validation(format!( - "underlay address {value} is not within \ - {UNDERLAY_MULTICAST_SUBNET}" - ))); - } - Ok(Self(value)) - } - - /// Returns the underlying IPv6 address. - #[inline] - pub const fn ip(&self) -> Ipv6Addr { - self.0 - } -} - -impl fmt::Display for UnderlayMulticastIpv6 { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl TryFrom for UnderlayMulticastIpv6 { - type Error = Error; - - fn try_from(value: Ipv6Addr) -> Result { - Self::new(value) - } -} - -impl From for Ipv6Addr { - fn from(addr: UnderlayMulticastIpv6) -> Self { - addr.0 - } -} - -impl From for IpAddr { - fn from(addr: UnderlayMulticastIpv6) -> Self { - IpAddr::V6(addr.0) - } -} - -impl FromStr for UnderlayMulticastIpv6 { - type Err = Error; - - fn from_str(s: &str) -> Result { - let addr: Ipv6Addr = s.parse().map_err(|_| { - Error::Validation(format!("invalid IPv6 address: {s}")) - })?; - Self::new(addr) - } -} +pub use mg_common::net::UnderlayMulticastIpv6; /// A validated multicast group address (IPv4 or IPv6). /// @@ -1694,6 +1606,18 @@ impl MulticastRoute { } } +impl From<&MulticastRoute> for mg_common::net::MulticastOrigin { + fn from(route: &MulticastRoute) -> Self { + Self { + overlay_group: route.key.group().ip(), + underlay_group: route.underlay_group, + vni: route.key.vni(), + metric: 0, + source: route.key.source(), + } + } +} + /// Source of a multicast route entry. #[derive( Debug, Copy, Clone, Serialize, Deserialize, JsonSchema, Eq, PartialEq, @@ -2034,7 +1958,7 @@ mod test { let result = MulticastRouteKey::new( Some(IpAddr::V4(src.ip())), group.into(), - DEFAULT_MULTICAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ); assert!( result.is_err(), @@ -2049,7 +1973,7 @@ mod test { let result = MulticastRouteKey::new( Some(IpAddr::V6(src)), group.into(), - DEFAULT_MULTICAST_VNI, + Vni::DEFAULT_MULTICAST_VNI, ); assert!( result.is_err(), From 3bf7fada6d1ea82c4a954bff07a1ca135f70796b Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Wed, 8 Apr 2026 07:28:30 +0000 Subject: [PATCH 4/4] [api] add if_name to PeerInfo for sled-to-port mapping Add an optional `if_name` field to PeerInfo (v2) so Omicron can learn which switch port a DDM peer was discovered on. This enables Omicron to use DDM as the primary source of truth for sled-to-port mapping, replacing or cross-validating the current inventory-based approach. The get_peers endpoint is versioned: v2+ returns PeerInfo with if_name, v1 returns the original PeerInfo without it. --- Cargo.lock | 1 + ddm-api/src/lib.rs | 17 ++++++++++- ddm-types/versions/src/latest.rs | 2 +- .../versions/src/multicast_support/db.rs | 28 +++++++++++++++++++ ddm/Cargo.toml | 1 + ddm/src/admin.rs | 16 +++++++++++ ddm/src/discovery.rs | 1 + ...318c9.json => ddm-admin-2.0.0-8aeda2.json} | 7 +++++ openapi/ddm-admin/ddm-admin-latest.json | 2 +- 9 files changed, 72 insertions(+), 3 deletions(-) rename openapi/ddm-admin/{ddm-admin-2.0.0-5318c9.json => ddm-admin-2.0.0-8aeda2.json} (98%) diff --git a/Cargo.lock b/Cargo.lock index 81237828..6bf26659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1318,6 +1318,7 @@ dependencies = [ "chrono", "ddm-api", "ddm-types", + "ddm-types-versions", "dpd-client", "dropshot 0.17.0", "hostname 0.4.2", diff --git a/ddm-api/src/lib.rs b/ddm-api/src/lib.rs index 905942b5..6f3f8282 100644 --- a/ddm-api/src/lib.rs +++ b/ddm-api/src/lib.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use ddm_types_versions::latest; +use ddm_types_versions::v1; use dropshot::HttpError; use dropshot::HttpResponseOk; use dropshot::HttpResponseUpdatedNoContent; @@ -46,11 +47,25 @@ api_versions!([ pub trait DdmAdminApi { type Context; - #[endpoint { method = GET, path = "/peers" }] + #[endpoint { + method = GET, + path = "/peers", + versions = VERSION_MULTICAST_SUPPORT.. + }] async fn get_peers( ctx: RequestContext, ) -> Result>, HttpError>; + /// Returns peers without interface name information. + #[endpoint { + method = GET, + path = "/peers", + versions = ..VERSION_MULTICAST_SUPPORT + }] + async fn get_peers_v1( + ctx: RequestContext, + ) -> Result>, HttpError>; + #[endpoint { method = DELETE, path = "/peers/{addr}" }] async fn expire_peer( ctx: RequestContext, diff --git a/ddm-types/versions/src/latest.rs b/ddm-types/versions/src/latest.rs index c2d29e3e..721f8cbf 100644 --- a/ddm-types/versions/src/latest.rs +++ b/ddm-types/versions/src/latest.rs @@ -11,11 +11,11 @@ pub mod admin { } pub mod db { - pub use crate::v1::db::PeerInfo; pub use crate::v1::db::PeerStatus; pub use crate::v1::db::RouterKind; pub use crate::v1::db::TunnelRoute; pub use crate::v2::db::MulticastRoute; + pub use crate::v2::db::PeerInfo; } pub mod exchange { diff --git a/ddm-types/versions/src/multicast_support/db.rs b/ddm-types/versions/src/multicast_support/db.rs index 8fc2e455..8fedfe64 100644 --- a/ddm-types/versions/src/multicast_support/db.rs +++ b/ddm-types/versions/src/multicast_support/db.rs @@ -8,6 +8,7 @@ use mg_common::net::MulticastOrigin; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::v1::db::{PeerStatus, RouterKind}; use crate::v2::exchange::MulticastPathHop; /// A multicast route learned via DDM. @@ -59,3 +60,30 @@ impl From for MulticastOrigin { x.origin } } + +/// Peer information with an optional interface name. +/// +// Adds the `if_name` field to identify which underlay interface the peer +// was discovered on. +#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema, PartialEq)] +pub struct PeerInfo { + pub status: PeerStatus, + pub addr: Ipv6Addr, + pub host: String, + pub kind: RouterKind, + /// Interface name the peer was discovered on (e.g., "tfportrear0_0"). + #[serde(default)] + pub if_name: Option, +} + +/// Downconvert v2 PeerInfo to v1 PeerInfo by dropping `if_name`. +impl From for crate::v1::db::PeerInfo { + fn from(p: PeerInfo) -> Self { + Self { + status: p.status, + addr: p.addr, + host: p.host, + kind: p.kind, + } + } +} diff --git a/ddm/Cargo.toml b/ddm/Cargo.toml index f505632d..7856f5a7 100644 --- a/ddm/Cargo.toml +++ b/ddm/Cargo.toml @@ -35,3 +35,4 @@ oxnet.workspace = true uuid.workspace = true ddm-api.workspace = true ddm-types.workspace = true +ddm-types-versions.workspace = true diff --git a/ddm/src/admin.rs b/ddm/src/admin.rs index 51a50677..26fe15e3 100644 --- a/ddm/src/admin.rs +++ b/ddm/src/admin.rs @@ -116,6 +116,22 @@ impl DdmAdminApi for DdmAdminApiImpl { Ok(HttpResponseOk(ctx.db.peers())) } + async fn get_peers_v1( + ctx: RequestContext, + ) -> Result< + HttpResponseOk>, + HttpError, + > { + let ctx = lock!(ctx.context()); + let peers = ctx + .db + .peers() + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect(); + Ok(HttpResponseOk(peers)) + } + async fn expire_peer( ctx: RequestContext, params: Path, diff --git a/ddm/src/discovery.rs b/ddm/src/discovery.rs index dd6da934..2c94a08f 100644 --- a/ddm/src/discovery.rs +++ b/ddm/src/discovery.rs @@ -527,6 +527,7 @@ fn handle_advertisement( addr: *sender, host: hostname, kind, + if_name: Some(ctx.config.if_name.clone()), }, ); if updated { diff --git a/openapi/ddm-admin/ddm-admin-2.0.0-5318c9.json b/openapi/ddm-admin/ddm-admin-2.0.0-8aeda2.json similarity index 98% rename from openapi/ddm-admin/ddm-admin-2.0.0-5318c9.json rename to openapi/ddm-admin/ddm-admin-2.0.0-8aeda2.json index 4f9ddae9..312fadd4 100644 --- a/openapi/ddm-admin/ddm-admin-2.0.0-5318c9.json +++ b/openapi/ddm-admin/ddm-admin-2.0.0-8aeda2.json @@ -679,6 +679,7 @@ ] }, "PeerInfo": { + "description": "Peer information with an optional interface name.", "type": "object", "properties": { "addr": { @@ -688,6 +689,12 @@ "host": { "type": "string" }, + "if_name": { + "nullable": true, + "description": "Interface name the peer was discovered on (e.g., \"tfportrear0_0\").", + "default": null, + "type": "string" + }, "kind": { "$ref": "#/components/schemas/RouterKind" }, diff --git a/openapi/ddm-admin/ddm-admin-latest.json b/openapi/ddm-admin/ddm-admin-latest.json index a2528890..aaa8691d 120000 --- a/openapi/ddm-admin/ddm-admin-latest.json +++ b/openapi/ddm-admin/ddm-admin-latest.json @@ -1 +1 @@ -ddm-admin-2.0.0-5318c9.json \ No newline at end of file +ddm-admin-2.0.0-8aeda2.json \ No newline at end of file