diff --git a/Cargo.lock b/Cargo.lock index 1eb5ee60..22105f99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -333,6 +333,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "iddqd", "itertools 0.14.0", "lazy_static", "libc", diff --git a/Cargo.toml b/Cargo.toml index cbc30329..a9b7ce45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,7 @@ serial_test = "3.3" internet-checksum = "0.2.1" network-interface = { git = "https://github.com/oxidecomputer/network-interface", branch = "illumos" } natord = "1.0" +iddqd = "0.3" [workspace.dependencies.opte-ioctl] diff --git a/bgp/Cargo.toml b/bgp/Cargo.toml index 2a92c126..19f5b731 100644 --- a/bgp/Cargo.toml +++ b/bgp/Cargo.toml @@ -21,6 +21,7 @@ itertools.workspace = true oxnet.workspace = true uuid.workspace = true rand.workspace = true +iddqd.workspace = true clap = { workspace = true, optional = true } [target.'cfg(target_os = "illumos")'.dependencies] diff --git a/bgp/src/connection.rs b/bgp/src/connection.rs index 3d1cd884..a6a78e97 100644 --- a/bgp/src/connection.rs +++ b/bgp/src/connection.rs @@ -6,14 +6,14 @@ use crate::{ clock::ConnectionClock, error::Error, messages::Message, - session::{FsmEvent, PeerId, SessionEndpoint, SessionInfo}, + router::SessionMap, + session::{FsmEvent, SessionInfo}, unnumbered::UnnumberedManager, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use slog::Logger; use std::{ - collections::BTreeMap, net::{SocketAddr, ToSocketAddrs}, sync::{Arc, Mutex, mpsc::Sender}, thread::JoinHandle, @@ -133,11 +133,11 @@ pub trait BgpListener { /// Accept a connection. This Listener is non-blocking, so the timeout /// is used as a sleep between accept attempts. This function may be called /// multiple times, returning a new connection each time. Policy application - /// is handled by the Dispatcher after the peer_to_session lookup. + /// is handled by the Dispatcher after the session lookup. fn accept( &self, log: Logger, - peer_to_session: Arc>>>, + sessions: Arc>>, timeout: Duration, ) -> Result; diff --git a/bgp/src/connection_channel.rs b/bgp/src/connection_channel.rs index 336ef7b2..1278c74a 100644 --- a/bgp/src/connection_channel.rs +++ b/bgp/src/connection_channel.rs @@ -17,15 +17,14 @@ use crate::{ error::Error, log::{connection_log, connection_log_lite}, messages::Message, - session::{ - ConnectionEvent, FsmEvent, PeerId, SessionEndpoint, SessionInfo, - }, + router::SessionMap, + session::{ConnectionEvent, FsmEvent, PeerId, SessionInfo}, unnumbered::UnnumberedManager, }; use mg_common::lock; use slog::Logger; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, net::{SocketAddr, ToSocketAddrs}, sync::{ Arc, Mutex, @@ -229,38 +228,32 @@ impl BgpListener for BgpListenerChannel { fn accept( &self, log: Logger, - peer_to_session: Arc< - Mutex>>, - >, + sessions: Arc>>, timeout: Duration, ) -> Result { let (peer, endpoint) = self.listener.accept(timeout)?; - // For channel-based test connections, we use the bind address as the local - // address. In a real network scenario (like TCP), we would need to get the - // actual connection's local address to handle dual-stack correctly, but for - // testing purposes with channels, the bind address is the connection address. let local = self.bind_addr; // Resolve peer address to appropriate PeerId (IP or Interface) let key = self.resolve_session_key(peer); - match lock!(peer_to_session).get(&key) { - Some(session_endpoint) => { - let config = lock!(session_endpoint.config); - Ok(BgpConnectionChannel::with_conn( - local, - peer, - endpoint, - session_endpoint.event_tx.clone(), - IO_TIMEOUT, - log, - ConnectionDirection::Inbound, - &config, - )) - } - None => Err(Error::UnknownPeer(peer.ip())), - } + let runner = lock!(sessions) + .get(&key) + .cloned() + .ok_or(Error::UnknownPeer(peer.ip()))?; + + let config = lock!(runner.session); + Ok(BgpConnectionChannel::with_conn( + local, + peer, + endpoint, + runner.event_tx.clone(), + IO_TIMEOUT, + log, + ConnectionDirection::Inbound, + &config, + )) } fn apply_policy( diff --git a/bgp/src/connection_tcp.rs b/bgp/src/connection_tcp.rs index 8f65b798..fb4c209d 100644 --- a/bgp/src/connection_tcp.rs +++ b/bgp/src/connection_tcp.rs @@ -19,16 +19,13 @@ use crate::{ OpenParseError, OpenParseErrorReason, RouteRefreshMessage, RouteRefreshParseError, RouteRefreshParseErrorReason, UpdateMessage, }, - session::{ - ConnectionEvent, FsmEvent, PeerId, SessionEndpoint, SessionEvent, - SessionInfo, - }, + router::SessionMap, + session::{ConnectionEvent, FsmEvent, PeerId, SessionEvent, SessionInfo}, unnumbered::UnnumberedManager, }; use mg_common::lock; use slog::Logger; use std::{ - collections::BTreeMap, io::Read, io::Write, net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs}, @@ -130,9 +127,7 @@ impl BgpListener for BgpListenerTcp { fn accept( &self, log: Logger, - peer_to_session: Arc< - Mutex>>, - >, + sessions: Arc>>, timeout: Duration, ) -> Result { let start = Instant::now(); @@ -164,23 +159,24 @@ impl BgpListener for BgpListenerTcp { // Resolve peer address to appropriate PeerId (IP or Interface) let key = self.resolve_session_key(peer); - // Check if we have a session for this peer - match lock!(peer_to_session).get(&key) { - Some(session_endpoint) => { - let config = lock!(session_endpoint.config); - return BgpConnectionTcp::with_conn( - local, - peer, - conn, - IO_TIMEOUT, - session_endpoint.event_tx.clone(), - log, - ConnectionDirection::Inbound, - &config, - ); - } - None => return Err(Error::UnknownPeer(ip)), - } + // Look up the session runner, clone the Arc, then release + // the sessions lock before accessing session config. + let runner = lock!(sessions) + .get(&key) + .cloned() + .ok_or(Error::UnknownPeer(ip))?; + + let config = lock!(runner.session); + return BgpConnectionTcp::with_conn( + local, + peer, + conn, + IO_TIMEOUT, + runner.event_tx.clone(), + log, + ConnectionDirection::Inbound, + &config, + ); } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { // Check if we've exceeded the timeout diff --git a/bgp/src/dispatcher.rs b/bgp/src/dispatcher.rs index 0e5f8e5a..b117147d 100644 --- a/bgp/src/dispatcher.rs +++ b/bgp/src/dispatcher.rs @@ -6,13 +6,13 @@ use crate::{ IO_TIMEOUT, connection::{BgpConnection, BgpListener}, log::dispatcher_log, - session::{FsmEvent, PeerId, SessionEndpoint, SessionEvent}, + router::SessionMap, + session::{FsmEvent, PeerId, SessionEvent}, unnumbered::UnnumberedManager, }; use mg_common::lock; use slog::Logger; use std::{ - collections::BTreeMap, net::SocketAddr, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, Mutex}, @@ -22,10 +22,9 @@ use std::{ const UNIT_DISPATCHER: &str = "dispatcher"; -pub struct Dispatcher { - /// Session endpoint map indexed by PeerId (IP or interface name) - /// This unified map supports both numbered and unnumbered BGP sessions - pub peer_to_session: Arc>>>, +pub struct Dispatcher { + /// Session map shared with all Routers, indexed by PeerId. + pub sessions: Arc>>, /// Optional unnumbered neighbor manager for link-local connection routing. /// When present, enables routing of IPv6 link-local connections to @@ -39,13 +38,13 @@ pub struct Dispatcher { impl Dispatcher { pub fn new( - peer_to_session: Arc>>>, + sessions: Arc>>, listen: String, log: Logger, unnumbered_manager: Option>, ) -> Self { Self { - peer_to_session, + sessions, unnumbered_manager, listen, log, @@ -139,7 +138,7 @@ impl Dispatcher { let accepted = match listener.accept( self.log.clone(), - self.peer_to_session.clone(), + self.sessions.clone(), IO_TIMEOUT, ) { Ok(c) => { @@ -167,51 +166,50 @@ impl Dispatcher { let peer_addr = accepted.peer(); let key = self.resolve_session_key(peer_addr); - match lock!(self.peer_to_session).get(&key).cloned() { - Some(session_endpoint) => { - // Apply connection policy from the session configuration - let min_ttl = lock!(session_endpoint.config).min_ttl; - let md5_key = - lock!(session_endpoint.config).md5_auth_key.clone(); - - if let Err(e) = - Listener::apply_policy(&accepted, min_ttl, md5_key) - { - dispatcher_log!(self, - warn, - "failed to apply policy for connection from {}: {e}", peer_addr; - "listen_address" => &self.listen, - "peer" => format!("{}", peer_addr), - "session_key" => format!("{:?}", key), - "error" => format!("{e}") - ); - } - - if let Err(e) = - session_endpoint.event_tx.send(FsmEvent::Session( - SessionEvent::TcpConnectionAcked(accepted), - )) - { - dispatcher_log!(self, - error, - "failed to send connected event to session for {}: {e}", peer_addr; - "listen_address" => &self.listen, - "peer" => format!("{}", peer_addr), - "session_key" => format!("{:?}", key) - ); - continue 'listener; - } - } - None => { + let (runner, min_ttl, md5_key) = { + let sessions = lock!(self.sessions); + let Some(runner) = sessions.get(&key).cloned() else { dispatcher_log!(self, debug, "no session found for peer, dropping connection"; "peer" => format!("{}", peer_addr), - "resolved_key" => format!("{:?}", key), + "session_key" => format!("{:?}", key), "listen_address" => &self.listen ); continue 'accept; - } + }; + let config = lock!(runner.session); + ( + runner.clone(), + config.min_ttl, + config.md5_auth_key.clone(), + ) + }; + + if let Err(e) = + Listener::apply_policy(&accepted, min_ttl, md5_key) + { + dispatcher_log!(self, + warn, + "failed to apply policy for connection from {}: {e}", peer_addr; + "listen_address" => &self.listen, + "peer" => format!("{}", peer_addr), + "session_key" => format!("{:?}", key), + "error" => format!("{e}") + ); + } + + if let Err(e) = runner.event_tx.send(FsmEvent::Session( + SessionEvent::TcpConnectionAcked(accepted), + )) { + dispatcher_log!(self, + error, + "failed to send connected event to session for {}: {e}", peer_addr; + "listen_address" => &self.listen, + "peer" => format!("{}", peer_addr), + "session_key" => format!("{:?}", key) + ); + continue 'listener; } } } @@ -235,7 +233,7 @@ impl Dispatcher { } } -impl Drop for Dispatcher { +impl Drop for Dispatcher { fn drop(&mut self) { dispatcher_log!(self, debug, diff --git a/bgp/src/router.rs b/bgp/src/router.rs index 8e9d49b0..681e115a 100644 --- a/bgp/src/router.rs +++ b/bgp/src/router.rs @@ -14,17 +14,17 @@ use crate::{ }, policy::{load_checker, load_shaper}, session::{ - AdminEvent, FsmEvent, NeighborInfo, PeerId, SessionEndpoint, - SessionInfo, SessionRunner, + AdminEvent, FsmEvent, NeighborInfo, PeerId, SessionInfo, SessionRunner, }, unnumbered::UnnumberedManager, }; +use iddqd::{IdOrdItem, IdOrdMap, id_upcast}; use mg_common::{lock, read_lock, write_lock}; use rdb::{Asn, Db, Prefix4, Prefix6}; use rhai::AST; use slog::Logger; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeSet, net::SocketAddr, sync::{ Arc, Mutex, MutexGuard, RwLock, @@ -34,6 +34,72 @@ use std::{ time::Duration, }; +/// Internal newtype for `IdOrdItem` impl — not exposed outside this module. +struct SessionHandle(Arc>); + +impl IdOrdItem for SessionHandle { + type Key<'a> = &'a PeerId; + + fn key(&self) -> Self::Key<'_> { + &self.0.neighbor.peer + } + + id_upcast!(); +} + +/// Ordered map of active BGP sessions, keyed by PeerId derived from each +/// session's neighbor info. Wraps an `IdOrdMap` so the key can never +/// diverge from the value it indexes. +pub struct SessionMap( + IdOrdMap>, +); + +impl Default for SessionMap { + fn default() -> Self { + Self(IdOrdMap::default()) + } +} + +impl SessionMap { + pub fn new() -> Self { + Self::default() + } + + pub fn get(&self, peer: &PeerId) -> Option<&Arc>> { + self.0.get(peer).map(|h| &h.0) + } + + pub fn insert(&mut self, session: Arc>) { + self.0.insert_overwrite(SessionHandle(session)); + } + + pub fn remove(&mut self, peer: &PeerId) -> Option>> { + self.0.remove(peer).map(|h| h.0) + } + + pub fn contains_key(&self, peer: &PeerId) -> bool { + self.0.contains_key(peer) + } + + pub fn values(&self) -> impl Iterator>> { + self.0.iter().map(|h| &h.0) + } + + pub fn iter( + &self, + ) -> impl Iterator>)> { + self.0.iter().map(|h| (&h.0.neighbor.peer, &h.0)) + } + + pub fn keys(&self) -> impl Iterator { + self.0.iter().map(|h| &h.0.neighbor.peer) + } + + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + const UNIT_SESSION_RUNNER: &str = "session_runner"; pub struct Router { @@ -46,7 +112,8 @@ pub struct Router { pub config: RouterConfig, /// A set of BGP session runners indexed by PeerId (IP or interface). - pub sessions: Mutex>>>, + /// Shared with the Dispatcher for connection routing. + pub sessions: Arc>>, /// Compiled policy programs. pub policy: Policy, @@ -61,10 +128,6 @@ pub struct Router { /// graceful shutdown (RFC 8326) with its peers. graceful_shutdown: AtomicBool, - /// A set of event channels indexed by PeerId. These channels - /// are used for cross-peer session communications. - peer_to_session: Arc>>>, - /// A fanout is used to distribute originated prefixes to all peer /// sessions. In the event that redistribution becomes supported this /// will also act as a redistribution mechanism from one peer session @@ -86,16 +149,15 @@ impl Router { config: RouterConfig, log: Logger, db: Db, - peer_to_session: Arc>>>, + sessions: Arc>>, ) -> Router { Self { config, - peer_to_session, + sessions, log, shutdown: AtomicBool::new(false), graceful_shutdown: AtomicBool::new(false), db, - sessions: Mutex::new(BTreeMap::new()), fanout4: Arc::new(RwLock::new(Fanout4::default())), fanout6: Arc::new(RwLock::new(Fanout6::default())), policy: Policy::default(), @@ -139,8 +201,8 @@ impl Router { /// Also cleans up fanout entries for all stopped sessions. fn stop_all_sessions(&self) { let sessions = lock!(self.sessions); - for (key, s) in sessions.iter() { - self.remove_fanout(key.clone()); + for (peer, s) in sessions.iter() { + self.remove_fanout(peer.clone()); s.shutdown(); } } @@ -150,12 +212,10 @@ impl Router { /// references, allowing BgpConnections to drop and their threads to clean up. fn delete_all_sessions(&self) { let sessions = std::mem::take(&mut *lock!(self.sessions)); - for (key, s) in sessions { - lock!(self.peer_to_session).remove(&key); - self.remove_fanout(key.clone()); + for (peer, s) in sessions.iter() { + self.remove_fanout(peer.clone()); s.shutdown(); } - // When `sessions` drops here, Arc references are released } pub fn shutdown(&self) { @@ -247,16 +307,16 @@ impl Router { event_rx: Receiver>, info: SessionInfo, ) -> Result, Error> { - let p2s = lock!(self.peer_to_session); - // Use PeerId::Ip for numbered sessions + let sessions = lock!(self.sessions); let key = PeerId::Ip(peer.host.ip()); - if p2s.contains_key(&key) { + if sessions.contains_key(&key) { + drop(sessions); Ok(EnsureSessionResult::Updated( self.update_session(peer, info)?, )) } else { Ok(EnsureSessionResult::New(self.new_session_locked( - p2s, key, peer, bind_addr, event_tx, event_rx, info, None, + sessions, key, peer, bind_addr, event_tx, event_rx, info, None, )?)) } } @@ -272,19 +332,16 @@ impl Router { info: SessionInfo, unnumbered_manager: Arc, ) -> Result, Error> { - let p2s = lock!(self.peer_to_session); + let sessions = lock!(self.sessions); let key = PeerId::Interface(interface.clone()); - if p2s.contains_key(&key) { - // Session exists, just update config - // Drop the lock before calling update to avoid potential deadlock - drop(p2s); + if sessions.contains_key(&key) { + drop(sessions); Ok(EnsureSessionResult::Updated( self.update_unnumbered_session(&interface, peer, info)?, )) } else { - // Create new unnumbered session Ok(EnsureSessionResult::New(self.new_session_locked( - p2s, + sessions, key, peer, bind_addr, @@ -304,15 +361,13 @@ impl Router { event_rx: Receiver>, info: SessionInfo, ) -> Result>, Error> { - let p2s = lock!(self.peer_to_session); - // Use PeerId::Ip for numbered sessions + let sessions = lock!(self.sessions); let key = PeerId::Ip(peer.host.ip()); - if p2s.contains_key(&key) { + if sessions.contains_key(&key) { Err(Error::PeerExists) } else { self.new_session_locked( - p2s, key, peer, bind_addr, event_tx, event_rx, info, - None, // No unnumbered_manager for numbered sessions + sessions, key, peer, bind_addr, event_tx, event_rx, info, None, ) } } @@ -328,14 +383,13 @@ impl Router { info: SessionInfo, unnumbered_manager: Arc, ) -> Result>, Error> { - let p2s = lock!(self.peer_to_session); - // Use PeerId::Interface for unnumbered sessions + let sessions = lock!(self.sessions); let key = PeerId::Interface(interface); - if p2s.contains_key(&key) { + if sessions.contains_key(&key) { Err(Error::PeerExists) } else { self.new_session_locked( - p2s, + sessions, key, peer, bind_addr, @@ -348,9 +402,9 @@ impl Router { } #[allow(clippy::too_many_arguments)] - pub fn new_session_locked( + fn new_session_locked( self: &Arc, - mut p2s: MutexGuard>>, + mut sessions: MutexGuard>, peer_id: PeerId, peer: PeerConfig, bind_addr: Option, @@ -359,7 +413,6 @@ impl Router { info: SessionInfo, unnumbered_manager: Option>, ) -> Result>, Error> { - // Update the SessionInfo with timer values from peer config let mut session_info = info.clone(); session_info.connect_retry_time = Duration::from_secs(peer.connect_retry); @@ -372,19 +425,10 @@ impl Router { let session = Arc::new(Mutex::new(session_info)); - p2s.insert( - peer_id.clone(), - SessionEndpoint { - event_tx: event_tx.clone(), - config: session.clone(), - }, - ); - drop(p2s); - let neighbor = NeighborInfo { name: Arc::new(Mutex::new(peer.name.clone())), peer_group: peer.group.clone(), - peer: peer_id.clone(), + peer: peer_id, port: peer.host.port(), }; @@ -392,13 +436,15 @@ impl Router { session, event_rx, event_tx.clone(), - neighbor.clone(), + neighbor, self.clone(), unnumbered_manager, )); + sessions.insert(runner.clone()); + drop(sessions); + self.spawn_session_thread(runner.clone()); - lock!(self.sessions).insert(peer_id, runner.clone()); Ok(runner) } @@ -445,7 +491,6 @@ impl Router { pub fn delete_session(&self, peer: impl Into) { let peer_id = peer.into(); - lock!(self.peer_to_session).remove(&peer_id); self.remove_fanout(peer_id.clone()); if let Some(s) = lock!(self.sessions).remove(&peer_id) { s.shutdown(); diff --git a/bgp/src/session.rs b/bgp/src/session.rs index a50f03a1..984215d0 100644 --- a/bgp/src/session.rs +++ b/bgp/src/session.rs @@ -1006,26 +1006,6 @@ pub struct NeighborInfo { pub port: u16, } -/// Session endpoint that combines the event sender with session configuration. -/// This is used in peer_to_session map to provide both communication channel -/// and policy information for each peer. -pub struct SessionEndpoint { - /// Event sender for FSM events to this session - pub event_tx: Sender>, - - /// Session configuration including policy settings - pub config: Arc>, -} - -impl Clone for SessionEndpoint { - fn clone(&self) -> Self { - Self { - event_tx: self.event_tx.clone(), - config: Arc::clone(&self.config), - } - } -} - pub const MAX_MESSAGE_HISTORY: usize = 1024; /// A message history entry is a BGP message with an associated timestamp and connection ID diff --git a/bgp/src/test.rs b/bgp/src/test.rs index ace0395b..a2279513 100644 --- a/bgp/src/test.rs +++ b/bgp/src/test.rs @@ -9,10 +9,10 @@ use crate::{ connection_tcp::{BgpConnectionTcp, BgpListenerTcp}, dispatcher::Dispatcher, params::{Ipv4UnicastConfig, Ipv6UnicastConfig, JitterRange}, - router::{EnsureSessionResult, Router}, + router::{EnsureSessionResult, Router, SessionMap}, session::{ AdminEvent, ConnectionKind, FsmEvent, FsmStateKind, PeerId, - SessionEndpoint, SessionInfo, SessionRunner, + SessionInfo, SessionRunner, }, unnumbered::UnnumberedManager, unnumbered_mock::UnnumberedManagerMock, @@ -23,7 +23,7 @@ use mg_common::test::{IpAllocation, LoopbackIpManager}; use mg_common::*; use rdb::{Asn, ImportExportPolicy4, ImportExportPolicy6, Prefix, Prefix4}; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::BTreeSet, net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6}, sync::{ Arc, Mutex, @@ -324,16 +324,14 @@ where let _ = std::fs::remove_dir_all(&db_path); let db = rdb::Db::new(&db_path, log.clone()).expect("create db"); - // Create dispatcher - // Phase 4: Use PeerId instead of IpAddr - let peer_to_session: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); + // Create shared session map + let sessions: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); let dispatcher = Arc::new(Dispatcher::new( - peer_to_session.clone(), + sessions.clone(), logical_router.listen_addr.to_string(), log.clone(), - None, // No unnumbered manager for tests + None, )); // Create router @@ -344,7 +342,7 @@ where }, log.clone(), db.clone(), - peer_to_session.clone(), + sessions.clone(), )); // Start router and dispatcher @@ -1801,12 +1799,10 @@ fn unnumbered_peering_helper( } // Create session maps - let p2s1: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); - let p2s2: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); + let sessions1: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); + let sessions2: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); // Create one Dispatcher per interface for each router. // Each Dispatcher binds to a unique link-local address with its interface's scope_id, @@ -1823,7 +1819,7 @@ fn unnumbered_peering_helper( *scope_id, )); let disp1 = Arc::new(Dispatcher::new( - p2s1.clone(), + sessions1.clone(), r1_addr.to_string(), log.clone(), Some(mock_ndp1.clone()), @@ -1848,7 +1844,7 @@ fn unnumbered_peering_helper( *scope_id, )); let disp2 = Arc::new(Dispatcher::new( - p2s2.clone(), + sessions2.clone(), r2_addr.to_string(), log.clone(), Some(mock_ndp2.clone()), @@ -1874,7 +1870,7 @@ fn unnumbered_peering_helper( }, log.clone(), db1.db().clone(), - p2s1.clone(), + sessions1.clone(), )); let router2 = Arc::new(Router::new( RouterConfig { @@ -1883,7 +1879,7 @@ fn unnumbered_peering_helper( }, log.clone(), db2.db().clone(), - p2s2.clone(), + sessions2.clone(), )); router1.run(); @@ -2514,22 +2510,20 @@ fn unnumbered_pair( SocketAddr::V6(SocketAddrV6::new(r2_ip, TEST_BGP_PORT, 0, scope_id)); // Create session maps - let p2s1: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); - let p2s2: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); + let sessions1: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); + let sessions2: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); // Create dispatchers let dispatcher1 = Arc::new(Dispatcher::new( - p2s1.clone(), + sessions1.clone(), r1_addr.to_string(), log.clone(), Some(mock_ndp1.clone()), )); let dispatcher2 = Arc::new(Dispatcher::new( - p2s2.clone(), + sessions2.clone(), r2_addr.to_string(), log.clone(), Some(mock_ndp2.clone()), @@ -2560,7 +2554,7 @@ fn unnumbered_pair( }, log.clone(), db1.db().clone(), - p2s1.clone(), + sessions1.clone(), )); let router2 = Arc::new(Router::new( RouterConfig { @@ -2569,7 +2563,7 @@ fn unnumbered_pair( }, log.clone(), db2.db().clone(), - p2s2.clone(), + sessions2.clone(), )); router1.run(); @@ -2752,19 +2746,16 @@ fn unnumbered_three_router_chain( )); // Create session maps - let p2s1: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); - let p2s2: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); - let p2s3: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); + let sessions1: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); + let sessions2: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); + let sessions3: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); // Create Dispatcher for R1 (single interface) let disp1 = Arc::new(Dispatcher::new( - p2s1.clone(), + sessions1.clone(), r1_addr.to_string(), log.clone(), Some(mock_ndp1.clone()), @@ -2779,7 +2770,7 @@ fn unnumbered_three_router_chain( // Create Dispatchers for R2 (two interfaces) let disp2_eth0 = Arc::new(Dispatcher::new( - p2s2.clone(), + sessions2.clone(), r2_eth0_addr.to_string(), log.clone(), Some(mock_ndp2.clone()), @@ -2793,7 +2784,7 @@ fn unnumbered_three_router_chain( .expect("spawn r2 eth0 dispatcher"); let disp2_eth1 = Arc::new(Dispatcher::new( - p2s2.clone(), + sessions2.clone(), r2_eth1_addr.to_string(), log.clone(), Some(mock_ndp2.clone()), @@ -2808,7 +2799,7 @@ fn unnumbered_three_router_chain( // Create Dispatcher for R3 (single interface) let disp3 = Arc::new(Dispatcher::new( - p2s3.clone(), + sessions3.clone(), r3_addr.to_string(), log.clone(), Some(mock_ndp3.clone()), @@ -2829,7 +2820,7 @@ fn unnumbered_three_router_chain( }, log.clone(), db1.db().clone(), - p2s1.clone(), + sessions1.clone(), )); let router2 = Arc::new(Router::new( RouterConfig { @@ -2838,7 +2829,7 @@ fn unnumbered_three_router_chain( }, log.clone(), db2.db().clone(), - p2s2.clone(), + sessions2.clone(), )); let router3 = Arc::new(Router::new( RouterConfig { @@ -2847,7 +2838,7 @@ fn unnumbered_three_router_chain( }, log.clone(), db3.db().clone(), - p2s3.clone(), + sessions3.clone(), )); router1.run(); @@ -3738,12 +3729,10 @@ fn test_unnumbered_interface_lifecycle() { mock_ndp2.configure_interface("eth0".to_string(), scope_id); // Create session maps - let p2s1: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); - let p2s2: Arc< - Mutex>>, - > = Arc::new(Mutex::new(BTreeMap::new())); + let sessions1: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); + let sessions2: Arc>> = + Arc::new(Mutex::new(SessionMap::new())); // Create dispatchers (needed for inbound connections) let r1_ip: Ipv6Addr = "fe80::1".parse().unwrap(); @@ -3754,13 +3743,13 @@ fn test_unnumbered_interface_lifecycle() { SocketAddr::V6(SocketAddrV6::new(r2_ip, TEST_BGP_PORT, 0, scope_id)); let disp1 = Arc::new(Dispatcher::new( - p2s1.clone(), + sessions1.clone(), r1_addr.to_string(), log.clone(), Some(mock_ndp1.clone()), )); let disp2 = Arc::new(Dispatcher::new( - p2s2.clone(), + sessions2.clone(), r2_addr.to_string(), log.clone(), Some(mock_ndp2.clone()), @@ -3786,7 +3775,7 @@ fn test_unnumbered_interface_lifecycle() { }, log.clone(), db1.db().clone(), - p2s1.clone(), + sessions1.clone(), )); let router2 = Arc::new(Router::new( RouterConfig { @@ -3795,7 +3784,7 @@ fn test_unnumbered_interface_lifecycle() { }, log.clone(), db2.db().clone(), - p2s2.clone(), + sessions2.clone(), )); router1.run(); diff --git a/bgp/src/unnumbered.rs b/bgp/src/unnumbered.rs index 4e8b570a..5d2b7b59 100644 --- a/bgp/src/unnumbered.rs +++ b/bgp/src/unnumbered.rs @@ -2,9 +2,79 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use iddqd::{BiHashItem, BiHashMap, bi_upcast}; use std::fmt; use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; +/// Bidirectional mapping between scope_id and interface name. +#[derive(Debug, Clone)] +struct ScopeEntry { + scope_id: u32, + interface: String, +} + +impl BiHashItem for ScopeEntry { + type K1<'a> = u32; + type K2<'a> = &'a str; + + fn key1(&self) -> Self::K1<'_> { + self.scope_id + } + + fn key2(&self) -> Self::K2<'_> { + &self.interface + } + + bi_upcast!(); +} + +/// Bidirectional scope_id ↔ interface name map. +/// +/// Wraps a `BiHashMap` to provide domain-specific accessors for the +/// Dispatcher (lookup by scope_id) and session management (lookup/removal +/// by interface name) paths. +#[derive(Debug, Clone)] +pub struct ScopeMap(BiHashMap); + +impl Default for ScopeMap { + fn default() -> Self { + Self(BiHashMap::new()) + } +} + +impl ScopeMap { + pub fn new() -> Self { + Self::default() + } + + pub fn insert(&mut self, scope_id: u32, interface: String) { + self.0.insert_overwrite(ScopeEntry { + scope_id, + interface, + }); + } + + pub fn remove_by_interface(&mut self, interface: &str) -> Option { + self.0.remove2(interface).map(|e| e.scope_id) + } + + pub fn get_interface(&self, scope_id: u32) -> Option<&str> { + self.0.get1(&scope_id).map(|e| e.interface.as_str()) + } + + pub fn get_scope_id(&self, interface: &str) -> Option { + self.0.get2(interface).map(|e| e.scope_id) + } + + pub fn contains_scope_id(&self, scope_id: u32) -> bool { + self.0.contains_key1(&scope_id) + } + + pub fn contains_interface(&self, interface: &str) -> bool { + self.0.contains_key2(interface) + } +} + /// Error type for UnnumberedManager operations. #[derive(Debug, Clone)] pub enum UnnumberedError { diff --git a/bgp/src/unnumbered_mock.rs b/bgp/src/unnumbered_mock.rs index ac76048d..cdab8a2a 100644 --- a/bgp/src/unnumbered_mock.rs +++ b/bgp/src/unnumbered_mock.rs @@ -42,7 +42,9 @@ //! - Interface exists at configuration time: `register_interface()` does all three //! - Interface removed while session established: `remove_system_interface()` -use crate::unnumbered::{NdpNeighbor, UnnumberedError, UnnumberedManager}; +use crate::unnumbered::{ + NdpNeighbor, ScopeMap, UnnumberedError, UnnumberedManager, +}; use mg_common::lock; use std::collections::{HashMap, HashSet}; use std::net::Ipv6Addr; @@ -66,13 +68,9 @@ pub struct UnnumberedManagerMock { /// NOTE: Peer discovery only works if interface is in `ndp_initialized`. discoveries: Arc>>>, - /// Map from scope_id to interface name. + /// Bidirectional scope_id ↔ interface name mapping. /// Used by Dispatcher to route incoming link-local connections. - scope_map: Arc>>, - - /// Reverse map from interface name to scope_id. - /// Maintained alongside scope_map for efficient lookup. - interface_to_scope: Arc>>, + scope_map: Arc>, /// Interfaces that exist on the system (link up). /// Presence here means `interface_is_active()` returns true. @@ -90,8 +88,7 @@ impl UnnumberedManagerMock { pub fn new() -> Arc { Arc::new(Self { discoveries: Arc::new(Mutex::new(HashMap::new())), - scope_map: Arc::new(Mutex::new(HashMap::new())), - interface_to_scope: Arc::new(Mutex::new(HashMap::new())), + scope_map: Arc::new(Mutex::new(ScopeMap::new())), system_interfaces: Arc::new(Mutex::new(HashSet::new())), ndp_initialized: Arc::new(Mutex::new(HashSet::new())), }) @@ -109,17 +106,14 @@ impl UnnumberedManagerMock { /// Use `add_system_interface()` separately to simulate the interface /// appearing on the system. pub fn configure_interface(&self, interface: String, scope_id: u32) { - lock!(self.scope_map).insert(scope_id, interface.clone()); - lock!(self.interface_to_scope).insert(interface, scope_id); + lock!(self.scope_map).insert(scope_id, interface); } /// Remove interface configuration. /// /// Removes the scope_id → interface mapping. Does NOT affect system presence. pub fn unconfigure_interface(&self, interface: &str) -> Option { - let scope_id = lock!(self.interface_to_scope).remove(interface)?; - lock!(self.scope_map).remove(&scope_id); - Some(scope_id) + lock!(self.scope_map).remove_by_interface(interface) } // ========================================================================= @@ -178,7 +172,7 @@ impl UnnumberedManagerMock { } // Check interface is configured - if !lock!(self.interface_to_scope).contains_key(interface) { + if !lock!(self.scope_map).contains_interface(interface) { return Err("interface not configured"); } @@ -293,7 +287,7 @@ impl UnnumberedManagerMock { let addr = lock!(self.discoveries) .get(interface) .and_then(|opt| *opt)?; - let scope_id = *lock!(self.interface_to_scope).get(interface)?; + let scope_id = lock!(self.scope_map).get_scope_id(interface)?; Some(NdpNeighbor { addr, scope_id }) } @@ -302,7 +296,9 @@ impl UnnumberedManagerMock { /// This simulates querying `UnnumberedManagerNdp::get_interface_for_scope()`. /// Used by Dispatcher to route incoming link-local connections. pub fn get_interface_for_scope(&self, scope_id: u32) -> Option { - lock!(self.scope_map).get(&scope_id).cloned() + lock!(self.scope_map) + .get_interface(scope_id) + .map(str::to_owned) } /// Get all registered interfaces. @@ -310,13 +306,13 @@ impl UnnumberedManagerMock { /// Returns a list of (interface_name, scope_id, discovered_peer). pub fn get_all_interfaces(&self) -> Vec<(String, u32, Option)> { let discoveries = lock!(self.discoveries); - let interface_to_scope = lock!(self.interface_to_scope); + let scope_map = lock!(self.scope_map); discoveries .iter() .filter_map(|(iface, peer)| { - let scope_id = interface_to_scope.get(iface)?; - Some((iface.clone(), *scope_id, *peer)) + let scope_id = scope_map.get_scope_id(iface)?; + Some((iface.clone(), scope_id, *peer)) }) .collect() } diff --git a/mgd/src/bgp_admin.rs b/mgd/src/bgp_admin.rs index 58cb470c..2177f1be 100644 --- a/mgd/src/bgp_admin.rs +++ b/mgd/src/bgp_admin.rs @@ -17,11 +17,10 @@ use bgp::{ connection_tcp::BgpConnectionTcp, messages::Afi, params::*, - router::{LoadPolicyError, Router}, + router::{LoadPolicyError, Router, SessionMap}, session::{ AdminEvent, ConnectionKind, FsmEvent, FsmEventRecord, FsmStateKind, - MessageHistory, MessageHistoryV1, PeerId, SessionEndpoint, SessionInfo, - SessionRunner, + MessageHistory, MessageHistoryV1, PeerId, SessionInfo, SessionRunner, }, }; use chrono::{DateTime, SecondsFormat, Utc}; @@ -62,23 +61,20 @@ const DEFAULT_BGP_LISTEN: SocketAddr = #[derive(Clone)] pub struct BgpContext { pub(crate) router: Arc>>>>, - peer_to_session: - Arc>>>, + pub(crate) sessions: Arc>>, pub(crate) unnumbered_manager: Arc, } impl BgpContext { pub fn new( - peer_to_session: Arc< - Mutex>>, - >, + sessions: Arc>>, log: Logger, ) -> Self { let router = Arc::new(Mutex::new(BTreeMap::new())); let unnumbered_manager = UnnumberedManagerNdp::new(router.clone(), log); Self { router, - peer_to_session, + sessions, unnumbered_manager, } } @@ -2297,7 +2293,7 @@ pub(crate) mod helpers { cfg, ctx.log.clone(), db.clone(), - ctx.bgp.peer_to_session.clone(), + ctx.bgp.sessions.clone(), )); router.run(); @@ -2535,12 +2531,13 @@ mod tests { admin::HandlerContext, bfd_admin::BfdContext, bgp_admin::BgpContext, }; use bgp::params::{ApplyRequestV1, BgpPeerConfigV1, BgpPeerParametersV1}; + use bgp::router::SessionMap; use mg_common::stats::MgLowerStats; use rdb::test::get_test_db; #[cfg(all(feature = "mg-lower", target_os = "illumos"))] use std::net::Ipv6Addr; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, env::temp_dir, fs::{create_dir_all, remove_dir_all}, net::SocketAddr, @@ -2567,7 +2564,7 @@ mod tests { #[cfg(all(feature = "mg-lower", target_os = "illumos"))] tep: Ipv6Addr::UNSPECIFIED, bgp: BgpContext::new( - Arc::new(Mutex::new(BTreeMap::new())), + Arc::new(Mutex::new(SessionMap::new())), log.clone(), ), bfd: BfdContext::new(log.clone()), diff --git a/mgd/src/main.rs b/mgd/src/main.rs index 02a77ad0..ec5d6eef 100644 --- a/mgd/src/main.rs +++ b/mgd/src/main.rs @@ -264,15 +264,15 @@ fn detect_switch_slot( } fn init_bgp(args: &RunArgs, log: &Logger) -> BgpContext { - let peer_to_session = Arc::new(Mutex::new(BTreeMap::new())); + let sessions = Arc::new(Mutex::new(bgp::router::SessionMap::new())); // Create BgpContext first to get access to unnumbered_manager - let bgp_context = BgpContext::new(peer_to_session.clone(), log.clone()); + let bgp_context = BgpContext::new(sessions.clone(), log.clone()); if !args.no_bgp_dispatcher { let bgp_dispatcher = bgp::dispatcher::Dispatcher::::new( - peer_to_session.clone(), + sessions.clone(), "[::]:179".into(), log.clone(), Some(bgp_context.unnumbered_manager.clone()), // Enable link-local connection routing diff --git a/mgd/src/oxstats.rs b/mgd/src/oxstats.rs index 3f725173..33da6575 100644 --- a/mgd/src/oxstats.rs +++ b/mgd/src/oxstats.rs @@ -284,7 +284,7 @@ impl Stats { for (asn, r) in &*routers { let mut session_counters = BTreeMap::new(); let sessions = lock!(r.sessions); - for (key, session) in &*sessions { + for (key, session) in sessions.iter() { // Only include IP-based sessions in metrics (unnumbered sessions use interface names) if let PeerId::Ip(addr) = key { session_counters.insert(*addr, session.counters.clone()); diff --git a/mgd/src/unnumbered_manager.rs b/mgd/src/unnumbered_manager.rs index 43731c56..2a95e038 100644 --- a/mgd/src/unnumbered_manager.rs +++ b/mgd/src/unnumbered_manager.rs @@ -3,8 +3,10 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use bgp::{ - connection_tcp::BgpConnectionTcp, router::Router, session::SessionRunner, - unnumbered::NdpNeighbor, + connection_tcp::BgpConnectionTcp, + router::Router, + session::SessionRunner, + unnumbered::{NdpNeighbor, ScopeMap}, }; use mg_common::lock; use mg_common::thread::ManagedThread; @@ -36,8 +38,8 @@ struct PendingInterfaceConfig { pub struct UnnumberedManagerNdp { routers: Arc>>>>, ndp_mgr: Arc, - /// Maps scope_id (interface index) to interface name for Dispatcher routing - interface_scope_map: Mutex>, + /// Bidirectional scope_id ↔ interface name mapping for Dispatcher routing + interface_scope_map: Mutex, log: Logger, // ========================================================================= @@ -99,7 +101,7 @@ impl UnnumberedManagerNdp { let manager = Arc::new(Self { routers, - interface_scope_map: Mutex::new(HashMap::default()), + interface_scope_map: Mutex::new(ScopeMap::new()), ndp_mgr: NdpManager::new(log.clone()), log, pending_interfaces: Mutex::new(HashMap::new()), @@ -325,14 +327,8 @@ impl UnnumberedManagerNdp { self.ndp_mgr.remove_interface(ifx); } - // Clean up scope mapping by searching for interface name - let mut scope_map = lock!(self.interface_scope_map); - if let Some((&scope_id, _)) = scope_map - .iter() - .find(|(_, name)| name.as_str() == interface_name) - { - scope_map.remove(&scope_id); - } + // Clean up scope mapping + lock!(self.interface_scope_map).remove_by_interface(interface_name); } /// Register an interface for NDP peer discovery. @@ -425,15 +421,9 @@ impl UnnumberedManagerNdp { self.ndp_mgr.remove_interface(ifx); } - // Clean up scope mapping by searching for interface name. + // Clean up scope mapping. // This works whether or not the interface still exists in the system. - let mut scope_map = lock!(self.interface_scope_map); - if let Some((&scope_id, _)) = scope_map - .iter() - .find(|(_, name)| name.as_str() == interface_str) - { - scope_map.remove(&scope_id); - } + lock!(self.interface_scope_map).remove_by_interface(interface_str); slog::info!( self.log, @@ -510,7 +500,9 @@ impl UnnumberedManagerNdp { /// * `Some(interface_name)` - Interface found for this scope_id /// * `None` - No interface registered with this scope_id pub fn get_interface_for_scope(&self, scope_id: u32) -> Option { - lock!(self.interface_scope_map).get(&scope_id).cloned() + lock!(self.interface_scope_map) + .get_interface(scope_id) + .map(str::to_owned) } /// Validate that a peer address matches the discovered neighbor for an interface. @@ -679,7 +671,7 @@ impl UnnumberedManagerNdp { .into_iter() .filter_map(|info| { // Only include interfaces in our scope map - if !scope_map.contains_key(&info.interface.index) { + if !scope_map.contains_scope_id(info.interface.index) { return None; } @@ -725,7 +717,7 @@ impl UnnumberedManagerNdp { let ifx = Self::get_interface(interface_name, &self.log)?; // Check if we're managing this interface - if !lock!(self.interface_scope_map).contains_key(&ifx.index) { + if !lock!(self.interface_scope_map).contains_scope_id(ifx.index) { return Ok(None); }