Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bgp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub trait BgpListener<Cnx: BgpConnection> {
/// * `unnumbered_manager` - Optional unnumbered manager for resolving scope_id -> interface
fn bind<A: ToSocketAddrs>(
addr: A,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Result<Self, Error>
where
Expand All @@ -148,6 +149,9 @@ pub trait BgpListener<Cnx: BgpConnection> {
min_ttl: Option<u8>,
md5_key: Option<String>,
) -> Result<(), Error>;

/// `SocketAddr` the listener is receiving connections on
fn bind_addr(&self) -> SocketAddr;
}

/// Implementors of this trait initiate outbound BGP connections to peers.
Expand Down
9 changes: 8 additions & 1 deletion bgp/src/connection_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use slog::{Logger, info};
use std::{
collections::{BTreeMap, HashMap},
net::{SocketAddr, ToSocketAddrs},
Expand Down Expand Up @@ -81,6 +81,7 @@ impl std::fmt::Display for Network {
}

/// A listener that can listen for messages on our simulated network.
#[derive(Debug)]
struct Listener {
rx: Receiver<(SocketAddr, Endpoint<Message>)>,
}
Expand Down Expand Up @@ -206,6 +207,7 @@ impl BgpListenerChannel {
impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
fn bind<A: ToSocketAddrs>(
addr: A,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Result<Self, Error>
where
Expand All @@ -219,6 +221,7 @@ impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
"at least one address required".into(),
))?;
let listener = NET.bind(addr);
info!(log, "BgpConnectionChannel Listener created"; "listener" => ?listener);
Ok(Self {
listener,
bind_addr: addr,
Expand Down Expand Up @@ -271,6 +274,10 @@ impl BgpListener<BgpConnectionChannel> for BgpListenerChannel {
// Policy application is ignored for test connections
Ok(())
}

fn bind_addr(&self) -> SocketAddr {
self.bind_addr
}
}

/// A struct to implement BgpConnection for our simulated test network.
Expand Down
20 changes: 14 additions & 6 deletions bgp/src/connection_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use slog::{Logger, info};
use std::{
collections::BTreeMap,
io::Read,
Expand Down Expand Up @@ -79,6 +79,7 @@ enum RecvError {
pub struct BgpListenerTcp {
listener: TcpListener,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
bind_addr: SocketAddr,
}

impl BgpListenerTcp {
Expand All @@ -103,6 +104,7 @@ impl BgpListenerTcp {
impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {
fn bind<A: ToSocketAddrs>(
addr: A,
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Result<Self, Error>
where
Expand All @@ -116,10 +118,15 @@ impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {
"at least one address required".into(),
))?;
let listener = TcpListener::bind(addr)?;
let bind_addr = listener.local_addr()?;

info!(log, "TcpListener created"; "listener" => ?listener);
listener.set_nonblocking(true)?;

Ok(Self {
listener,
unnumbered_manager,
bind_addr,
})
}

Expand Down Expand Up @@ -228,6 +235,10 @@ impl BgpListener<BgpConnectionTcp> for BgpListenerTcp {

Ok(())
}

fn bind_addr(&self) -> SocketAddr {
self.bind_addr
}
}

pub struct BgpConnectorTcp;
Expand Down Expand Up @@ -293,11 +304,8 @@ impl BgpConnector<BgpConnectionTcp> for BgpConnectorTcp {
"timeout" => timeout.as_millis()
);

// Bind to source address if specified
if let Some(source_addr) = config.bind_addr {
let mut src = source_addr;
// clear source port, we only want to set the source ip
src.set_port(0);
// Bind to source address/port if specified
if let Some(src) = config.bind_addr {
let ba: socket2::SockAddr = src.into();
if let Err(e) = s.bind(&ba) {
connection_log_lite!(log,
Expand Down
121 changes: 53 additions & 68 deletions bgp/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
use crate::{
IO_TIMEOUT,
connection::{BgpConnection, BgpListener},
log::dispatcher_log,
session::{FsmEvent, PeerId, SessionEndpoint, SessionEvent},
unnumbered::UnnumberedManager,
};
use mg_common::lock;
use slog::Logger;
use slog::{Logger, debug, error, info, warn};
use std::{
collections::BTreeMap,
net::SocketAddr,
Expand All @@ -34,7 +33,7 @@ pub struct Dispatcher<Cnx: BgpConnection> {

shutdown: AtomicBool,
listen: String,
log: Logger,
log: Mutex<Logger>,
}

impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
Expand All @@ -44,11 +43,17 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
log: Logger,
unnumbered_manager: Option<Arc<dyn UnnumberedManager>>,
) -> Self {
let log = log.new(slog::o!(
"component" => crate::COMPONENT_BGP,
"module" => crate::MOD_NEIGHBOR,
"unit" => UNIT_DISPATCHER,
));

Self {
peer_to_session,
unnumbered_manager,
listen,
log,
log: Mutex::new(log),
shutdown: AtomicBool::new(false),
}
}
Expand Down Expand Up @@ -84,88 +89,87 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
}

pub fn run<Listener: BgpListener<Cnx>>(&self) {
dispatcher_log!(self,
info,
"dispatcher started";
"listen_address" => &self.listen
);
let mut log = lock!(self.log).clone();
info!(log, "dispatcher started");

'listener: loop {
info!(log, "starting listener with bind arg: {}", &self.listen);

// We need to check the shutdown flag in the listener loop so we can
// still return even if bind() keeps failing and we're stuck
if self.shutdown.load(Ordering::Acquire) {
dispatcher_log!(self,
info,
"dispatcher caught shutdown flag from listener loop";
"listen_address" => &self.listen
info!(
log,
"dispatcher caught shutdown flag from listener loop"
);
self.shutdown.store(false, Ordering::Release);
break 'listener;
}
dispatcher_log!(self,
debug,
"listener bind: {}", &self.listen;
"listen_address" => &self.listen
);

let listener = match Listener::bind(
&self.listen,
log.clone(),
self.unnumbered_manager.clone(),
) {
Ok(l) => l,
Err(e) => {
dispatcher_log!(self,
error,
"listener bind error: {e}";
"listen_address" => &self.listen
);
error!(log, "listener bind error: {e}");
sleep(Duration::from_secs(1));
// XXX: possible death loop?
continue 'listener;
}
};

// If the user requested to bind on port 0, a random port will be selected,
// so we capture the port in the logger context after the listener has been
// started
let bound_log =
log.new(slog::o!("bind_addr" => listener.bind_addr()));
*lock!(self.log) = bound_log.clone();
log = bound_log;

info!(log, "transitioning to accept loop");
'accept: loop {
// We also need to check the shutdown flag inside the accept
// loop, because we won't restart the listener loop unless we've
// encountered an error indicating we can't just call accept()
// again and we need a whole new listener.
if self.shutdown.load(Ordering::Acquire) {
dispatcher_log!(self,
info,
"dispatcher caught shutdown flag from accept loop";
"listen_address" => &self.listen
info!(
log,
"dispatcher caught shutdown flag from accept loop"
);
self.shutdown.store(false, Ordering::Release);
break 'listener;
}

let accepted = match listener.accept(
self.log.clone(),
log.clone(),
self.peer_to_session.clone(),
IO_TIMEOUT,
) {
Ok(c) => {
dispatcher_log!(self,
debug,
debug!(log,
"accepted inbound connection from: {}", c.peer();
"peer" => c.peer(),
"listen_address" => &self.listen
);
c
}
Err(crate::error::Error::Timeout) => {
continue 'accept;
}
Err(e) => {
dispatcher_log!(self,
error,
"listener accept error: {e}";
"listen_address" => &self.listen
);
error!(log, "listener accept error: {e}");
continue 'listener;
}
};

let peer_addr = accepted.peer();
let key = self.resolve_session_key(peer_addr);
let session_log = log.new(slog::o!(
"peer" => peer_addr,
"session_key" => format!("{key:?}"),
));

match lock!(self.peer_to_session).get(&key).cloned() {
Some(session_endpoint) => {
Expand All @@ -177,12 +181,8 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
if let Err(e) =
Listener::apply_policy(&accepted, min_ttl, md5_key)
{
dispatcher_log!(self,
warn,
"failed to apply policy for connection from {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key),
warn!(session_log,
"failed to apply policy for connection";
"error" => format!("{e}")
);
}
Expand All @@ -192,56 +192,41 @@ impl<Cnx: BgpConnection + 'static> Dispatcher<Cnx> {
SessionEvent::TcpConnectionAcked(accepted),
))
{
dispatcher_log!(self,
error,
"failed to send connected event to session for {}: {e}", peer_addr;
"listen_address" => &self.listen,
"peer" => format!("{}", peer_addr),
"session_key" => format!("{:?}", key)
error!(session_log,
"failed to send connected event to session";
"error" => format!("{e}")
);
continue 'listener;
}
}
None => {
dispatcher_log!(self,
debug,
"no session found for peer, dropping connection";
"peer" => format!("{}", peer_addr),
"resolved_key" => format!("{:?}", key),
"listen_address" => &self.listen
debug!(
session_log,
"no session found for peer, dropping connection"
);
continue 'accept;
}
}
}
}
dispatcher_log!(self,
info,
"dispatcher shutdown complete";
"listen_address" => &self.listen
);
info!(log, "dispatcher shutdown complete");
}

pub fn listen_addr(&self) -> &str {
&self.listen
}

pub fn shutdown(&self) {
dispatcher_log!(self, info,
"dispatcher received shutdown request, setting shutdown flag";
"listen_address" => &self.listen
info!(
lock!(self.log),
"dispatcher received shutdown request, setting shutdown flag"
);
self.shutdown.store(true, Ordering::Release);
}
}

impl<Cnx: BgpConnection> Drop for Dispatcher<Cnx> {
fn drop(&mut self) {
dispatcher_log!(self,
debug,
"dropping dispatcher with listen_addr {}",
&self.listen;
"listen_address" => &self.listen
);
debug!(lock!(self.log), "dropping dispatcher");
}
}
Loading