Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ serial_test = "3.3"
internet-checksum = "0.2.1"
network-interface = { git = "https://github.com/oxidecomputer/network-interface", branch = "illumos" }
natord = "1.0"
iddqd = "0.3"


[workspace.dependencies.opte-ioctl]
Expand Down
1 change: 1 addition & 0 deletions bgp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ itertools.workspace = true
oxnet.workspace = true
uuid.workspace = true
rand.workspace = true
iddqd.workspace = true
clap = { workspace = true, optional = true }

[target.'cfg(target_os = "illumos")'.dependencies]
Expand Down
8 changes: 4 additions & 4 deletions bgp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::{
clock::ConnectionClock,
error::Error,
messages::Message,
session::{FsmEvent, PeerId, SessionEndpoint, SessionInfo},
router::SessionMap,
session::{FsmEvent, SessionInfo},
unnumbered::UnnumberedManager,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use slog::Logger;
use std::{
collections::BTreeMap,
net::{SocketAddr, ToSocketAddrs},
sync::{Arc, Mutex, mpsc::Sender},
thread::JoinHandle,
Expand Down Expand Up @@ -133,11 +133,11 @@ pub trait BgpListener<Cnx: BgpConnection> {
/// Accept a connection. This Listener is non-blocking, so the timeout
/// is used as a sleep between accept attempts. This function may be called
/// multiple times, returning a new connection each time. Policy application
/// is handled by the Dispatcher after the peer_to_session lookup.
/// is handled by the Dispatcher after the session lookup.
fn accept(
&self,
log: Logger,
peer_to_session: Arc<Mutex<BTreeMap<PeerId, SessionEndpoint<Cnx>>>>,
sessions: Arc<Mutex<SessionMap<Cnx>>>,
timeout: Duration,
) -> Result<Cnx, Error>;

Expand Down
47 changes: 20 additions & 27 deletions bgp/src/connection_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ use crate::{
error::Error,
log::{connection_log, connection_log_lite},
messages::Message,
session::{
ConnectionEvent, FsmEvent, PeerId, SessionEndpoint, SessionInfo,
},
router::SessionMap,
session::{ConnectionEvent, FsmEvent, PeerId, SessionInfo},
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use std::{
collections::{BTreeMap, HashMap},
collections::HashMap,
net::{SocketAddr, ToSocketAddrs},
sync::{
Arc, Mutex,
Expand Down Expand Up @@ -229,38 +228,32 @@ impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
fn accept(
&self,
log: Logger,
peer_to_session: Arc<
Mutex<BTreeMap<PeerId, SessionEndpoint<BgpConnectionChannel>>>,
>,
sessions: Arc<Mutex<SessionMap<BgpConnectionChannel>>>,
timeout: Duration,
) -> Result<BgpConnectionChannel, Error> {
let (peer, endpoint) = self.listener.accept(timeout)?;

// For channel-based test connections, we use the bind address as the local
// address. In a real network scenario (like TCP), we would need to get the
// actual connection's local address to handle dual-stack correctly, but for
// testing purposes with channels, the bind address is the connection address.
let local = self.bind_addr;

// Resolve peer address to appropriate PeerId (IP or Interface)
let key = self.resolve_session_key(peer);

match lock!(peer_to_session).get(&key) {
Some(session_endpoint) => {
let config = lock!(session_endpoint.config);
Ok(BgpConnectionChannel::with_conn(
local,
peer,
endpoint,
session_endpoint.event_tx.clone(),
IO_TIMEOUT,
log,
ConnectionDirection::Inbound,
&config,
))
}
None => Err(Error::UnknownPeer(peer.ip())),
}
let runner = lock!(sessions)
.get(&key)
.cloned()
.ok_or(Error::UnknownPeer(peer.ip()))?;

let config = lock!(runner.session);
Ok(BgpConnectionChannel::with_conn(
local,
peer,
endpoint,
runner.event_tx.clone(),
IO_TIMEOUT,
log,
ConnectionDirection::Inbound,
&config,
))
}

fn apply_policy(
Expand Down
46 changes: 21 additions & 25 deletions bgp/src/connection_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ use crate::{
OpenParseError, OpenParseErrorReason, RouteRefreshMessage,
RouteRefreshParseError, RouteRefreshParseErrorReason, UpdateMessage,
},
session::{
ConnectionEvent, FsmEvent, PeerId, SessionEndpoint, SessionEvent,
SessionInfo,
},
router::SessionMap,
session::{ConnectionEvent, FsmEvent, PeerId, SessionEvent, SessionInfo},
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use std::{
collections::BTreeMap,
io::Read,
io::Write,
net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs},
Expand Down Expand Up @@ -130,9 +127,7 @@ impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {
fn accept(
&self,
log: Logger,
peer_to_session: Arc<
Mutex<BTreeMap<PeerId, SessionEndpoint<BgpConnectionTcp>>>,
>,
sessions: Arc<Mutex<SessionMap<BgpConnectionTcp>>>,
timeout: Duration,
) -> Result<BgpConnectionTcp, Error> {
let start = Instant::now();
Expand Down Expand Up @@ -164,23 +159,24 @@ impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {
// Resolve peer address to appropriate PeerId (IP or Interface)
let key = self.resolve_session_key(peer);

// Check if we have a session for this peer
match lock!(peer_to_session).get(&key) {
Some(session_endpoint) => {
let config = lock!(session_endpoint.config);
return BgpConnectionTcp::with_conn(
local,
peer,
conn,
IO_TIMEOUT,
session_endpoint.event_tx.clone(),
log,
ConnectionDirection::Inbound,
&config,
);
}
None => return Err(Error::UnknownPeer(ip)),
}
// Look up the session runner, clone the Arc, then release
// the sessions lock before accessing session config.
let runner = lock!(sessions)
.get(&key)
.cloned()
.ok_or(Error::UnknownPeer(ip))?;

let config = lock!(runner.session);
return BgpConnectionTcp::with_conn(
local,
peer,
conn,
IO_TIMEOUT,
runner.event_tx.clone(),
log,
ConnectionDirection::Inbound,
&config,
);
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Check if we've exceeded the timeout
Expand Down
94 changes: 46 additions & 48 deletions bgp/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use crate::{
IO_TIMEOUT,
connection::{BgpConnection, BgpListener},
log::dispatcher_log,
session::{FsmEvent, PeerId, SessionEndpoint, SessionEvent},
router::SessionMap,
session::{FsmEvent, PeerId, SessionEvent},
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use std::{
collections::BTreeMap,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, Mutex},
Expand All @@ -22,10 +22,9 @@ use std::{

const UNIT_DISPATCHER: &str = "dispatcher";

pub struct Dispatcher<Cnx: BgpConnection> {
/// Session endpoint map indexed by PeerId (IP or interface name)
/// This unified map supports both numbered and unnumbered BGP sessions
pub peer_to_session: Arc<Mutex<BTreeMap<PeerId, SessionEndpoint<Cnx>>>>,
pub struct Dispatcher<Cnx: BgpConnection + 'static> {
/// Session map shared with all Routers, indexed by PeerId.
pub sessions: Arc<Mutex<SessionMap<Cnx>>>,

/// Optional unnumbered neighbor manager for link-local connection routing.
/// When present, enables routing of IPv6 link-local connections to
Expand All @@ -39,13 +38,13 @@ pub struct Dispatcher<Cnx: BgpConnection> {

impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
pub fn new(
peer_to_session: Arc<Mutex<BTreeMap<PeerId, SessionEndpoint<Cnx>>>>,
sessions: Arc<Mutex<SessionMap<Cnx>>>,
listen: String,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Self {
Self {
peer_to_session,
sessions,
unnumbered_manager,
listen,
log,
Expand Down Expand Up @@ -139,7 +138,7 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {

let accepted = match listener.accept(
self.log.clone(),
self.peer_to_session.clone(),
self.sessions.clone(),
IO_TIMEOUT,
) {
Ok(c) => {
Expand Down Expand Up @@ -167,51 +166,50 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
let peer_addr = accepted.peer();
let key = self.resolve_session_key(peer_addr);

match lock!(self.peer_to_session).get(&key).cloned() {
Some(session_endpoint) => {
// Apply connection policy from the session configuration
let min_ttl = lock!(session_endpoint.config).min_ttl;
let md5_key =
lock!(session_endpoint.config).md5_auth_key.clone();

if let Err(e) =
Listener::apply_policy(&accepted, min_ttl, md5_key)
{
dispatcher_log!(self,
warn,
"failed to apply policy for connection from {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key),
"error" => format!("{e}")
);
}

if let Err(e) =
session_endpoint.event_tx.send(FsmEvent::Session(
SessionEvent::TcpConnectionAcked(accepted),
))
{
dispatcher_log!(self,
error,
"failed to send connected event to session for {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key)
);
continue 'listener;
}
}
None => {
let (runner, min_ttl, md5_key) = {
let sessions = lock!(self.sessions);
let Some(runner) = sessions.get(&key).cloned() else {
dispatcher_log!(self,
debug,
"no session found for peer, dropping connection";
"peer" => format!("{}", peer_addr),
"resolved_key" => format!("{:?}", key),
"session_key" => format!("{:?}", key),
"listen_address" => &self.listen
);
continue 'accept;
}
};
let config = lock!(runner.session);
(
runner.clone(),
config.min_ttl,
config.md5_auth_key.clone(),
)
};

if let Err(e) =
Listener::apply_policy(&accepted, min_ttl, md5_key)
{
dispatcher_log!(self,
warn,
"failed to apply policy for connection from {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key),
"error" => format!("{e}")
);
}

if let Err(e) = runner.event_tx.send(FsmEvent::Session(
SessionEvent::TcpConnectionAcked(accepted),
)) {
dispatcher_log!(self,
error,
"failed to send connected event to session for {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key)
);
continue 'listener;
}
}
}
Expand All @@ -235,7 +233,7 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
}
}

impl<Cnx: BgpConnection> Drop for Dispatcher<Cnx> {
impl<Cnx: BgpConnection + 'static> Drop for Dispatcher<Cnx> {
fn drop(&mut self) {
dispatcher_log!(self,
debug,
Expand Down
Loading