From 7b2f888547f4e3ccd7298352421ffb8f60686376 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 15 Mar 2026 11:50:05 -0700 Subject: [PATCH 1/5] feat: add rib reconstruction command - add rib reconstruction for arbitrary timestamps using base RIBs and replayed updates - support stdout and SQLite output with country and full-feed filters - document rib command behavior in the README and changelog --- CHANGELOG.md | 16 + Cargo.lock | 1 + Cargo.toml | 4 + README.md | 61 ++ src/bin/commands/elem_format.rs | 25 + src/bin/commands/mod.rs | 1 + src/bin/commands/rib.rs | 167 +++++ src/bin/monocle.rs | 7 + src/database/mod.rs | 5 + src/database/session/mod.rs | 7 + src/database/session/rib_store.rs | 505 +++++++++++++ src/lens/mod.rs | 4 + src/lens/rib/mod.rs | 1143 +++++++++++++++++++++++++++++ 13 files changed, 1946 insertions(+) create mode 100644 src/bin/commands/rib.rs create mode 100644 src/database/session/rib_store.rs create mode 100644 src/lens/rib/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 92119c7..861a05c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ All notable changes to this project will be documented in this file. +## Unreleased changes + +### New Features + +* Added `monocle rib` for reconstructing RIB state at arbitrary timestamps + * Selects the latest RIB before each requested `rib_ts` and replays updates to the exact timestamp + * Supports stdout output by default and merged SQLite output + * Repeated `--ts` values are written to one merged SQLite file keyed by `rib_ts` + * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector + * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` + * Auto-generated output filenames include requested timestamps and normalized filter slugs + +### Code Improvements + +* Added session-backed SQLite stores for reconstructed RIB working state and merged SQLite export + ## v1.2.0 - 2026-02-28 ### New Features diff --git a/Cargo.lock b/Cargo.lock index 06fd17d..e45f46f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1673,6 +1673,7 @@ dependencies = [ "oneio", "radar-rs", "rayon", + "regex", "rusqlite", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 20f841c..195b746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ lib = [ # Database "dep:oneio", "dep:ipnet", + "dep:tempfile", # Lenses "dep:chrono-humanize", "dep:dateparser", @@ -98,6 +99,7 @@ lib = [ "dep:itertools", "dep:radar-rs", "dep:rayon", + "dep:regex", # Display (always included with lib) "dep:tabled", "dep:json_to_table", @@ -151,6 +153,7 @@ tracing = "0.1" # Database ipnet = { version = "2.10", features = ["json"], optional = true } oneio = { version = "0.20.1", default-features = false, features = ["https", "gz", "bz", "json"], optional = true } +tempfile = { version = "3", optional = true } # Lenses chrono-humanize = { version = "0.2", optional = true } @@ -162,6 +165,7 @@ bgpkit-commons = { version = "0.10.2", features = ["asinfo", "rpki", "countries" itertools = { version = "0.14", optional = true } radar-rs = { version = "0.1.0", optional = true } rayon = { version = "1.8", optional = true } +regex = { version = "1.11", optional = true } # Display tabled = { version = "0.20", optional = true } diff --git a/README.md b/README.md index 18d14c4..946af0d 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ See through all Border Gateway Protocol (BGP) data with a monocle. - [`monocle parse`](#monocle-parse) - [Output Format](#output-format) - [`monocle search`](#monocle-search) + - [`monocle rib`](#monocle-rib) - [`monocle time`](#monocle-time) - [`monocle inspect`](#monocle-inspect) - [`monocle country`](#monocle-country) @@ -229,6 +230,7 @@ Subcommands: - `parse`: parse individual MRT files - `search`: search for matching messages from all available public MRT files +- `rib`: reconstruct final RIB state at one or more arbitrary timestamps - `server`: start a WebSocket server for programmatic access - `inspect`: unified AS and prefix information lookup - `country`: utility to look up country name and code @@ -259,6 +261,7 @@ Usage: monocle [OPTIONS] Commands: parse Parse individual MRT files given a file path, local or remote search Search BGP messages from all available public MRT files + rib Reconstruct final RIB state at one or more arbitrary timestamps server Start the WebSocket server (ws://
:/ws, health: http://
:/health) inspect Unified AS and prefix information lookup country Country name and code lookup utilities @@ -701,6 +704,64 @@ Use `--broker-files` to see the list of MRT files that would be queried without -c rrc00 --broker-files ``` +### `monocle rib` + +Reconstruct final RIB state at one or more arbitrary timestamps by loading the latest RIB at or before each requested `rib_ts` and replaying updates up to the exact timestamp. + +```text +➜ monocle rib --help +Reconstruct final RIB state at one or more arbitrary timestamps + +Usage: monocle rib [OPTIONS] --ts + +Options: + --ts Target RIB timestamp. Repeat to request multiple snapshots + --debug Print debug information + -o, --origin-asn Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude + -C, --country Filter by origin ASN registration country + --format Output format: table, markdown, json, json-pretty, json-line, psv (default varies by command) + --json Output as JSON objects (shortcut for --format json-pretty) + -p, --prefix Filter by network prefix(es), comma-separated. Prefix with ! to exclude + --no-update Disable automatic database updates (use existing cached data only) + -s, --include-super Include super-prefixes when filtering + -S, --include-sub Include sub-prefixes when filtering + -J, --peer-asn Filter by peer ASN(s), comma-separated. Prefix with ! to exclude + -a, --as-path Filter by AS path regex string + -c, --collector Filter by collector, e.g., rrc00 or route-views2 + -P, --project Filter by route collection project, i.e. riperis or routeviews + --full-feed-only Keep only full-feed peers based on broker peer metadata + --output-type File output type. If omitted and `--output-dir` is also omitted, output goes to stdout [possible values: sqlite] + --output-dir Output directory for generated SQLite files + -h, --help Print help + -V, --version Print version +``` + +Behavior: + +- A single `--ts` writes to stdout by default. +- Repeated `--ts` values require file output and are written to one merged SQLite file keyed by `rib_ts`. +- If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. +- `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. + +Examples: + +```bash +# Print the reconstructed RIB for a single timestamp to stdout +monocle rib --ts 2025-09-01T12:00:00Z -c rrc00 -o 13335 + +# Write multiple timestamps to one merged SQLite file in the current directory +monocle rib \ + --ts 2025-09-01T12:00:00Z \ + --ts 2025-09-01T18:00:00Z \ + --output-type sqlite \ + -c rrc00 \ + --country US \ + --full-feed-only + +# Override the output directory +monocle rib --ts 2025-09-01T12:00:00Z --output-dir /tmp/rib-out -c route-views2 +``` + ### `monocle time` Parse and convert time strings between various formats. diff --git a/src/bin/commands/elem_format.rs b/src/bin/commands/elem_format.rs index edc4d20..452029d 100644 --- a/src/bin/commands/elem_format.rs +++ b/src/bin/commands/elem_format.rs @@ -17,7 +17,9 @@ pub const AVAILABLE_FIELDS: &[&str] = &[ "peer_ip", "peer_asn", "prefix", + "path_id", "as_path", + "origin_asns", "origin", "next_hop", "local_pref", @@ -174,11 +176,26 @@ pub fn get_field_value_with_time_format( "peer_ip" => elem.peer_ip.to_string(), "peer_asn" => elem.peer_asn.to_string(), "prefix" => elem.prefix.to_string(), + "path_id" => elem + .prefix + .path_id + .map(|path_id| path_id.to_string()) + .unwrap_or_default(), "as_path" => elem .as_path .as_ref() .map(|p| p.to_string()) .unwrap_or_default(), + "origin_asns" => elem + .origin_asns + .as_ref() + .map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }) + .unwrap_or_default(), "origin" => elem .origin .as_ref() @@ -288,10 +305,18 @@ pub fn build_json_object( "peer_ip" => json!(elem.peer_ip.to_string()), "peer_asn" => json!(elem.peer_asn), "prefix" => json!(elem.prefix.to_string()), + "path_id" => match elem.prefix.path_id { + Some(path_id) => json!(path_id), + None => serde_json::Value::Null, + }, "as_path" => match &elem.as_path { Some(p) => json!(p.to_string()), None => serde_json::Value::Null, }, + "origin_asns" => match &elem.origin_asns { + Some(asns) => json!(asns.iter().map(|asn| asn.to_string()).collect::>()), + None => serde_json::Value::Null, + }, "origin" => match &elem.origin { Some(o) => json!(o.to_string()), None => serde_json::Value::Null, diff --git a/src/bin/commands/mod.rs b/src/bin/commands/mod.rs index 8f7a72e..4f5d0f0 100644 --- a/src/bin/commands/mod.rs +++ b/src/bin/commands/mod.rs @@ -6,6 +6,7 @@ pub mod inspect; pub mod ip; pub mod parse; pub mod pfx2as; +pub mod rib; pub mod rpki; pub mod search; pub mod time; diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs new file mode 100644 index 0000000..1aef1e8 --- /dev/null +++ b/src/bin/commands/rib.rs @@ -0,0 +1,167 @@ +use std::fs; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Result}; +use bgpkit_parser::BgpElem; + +use monocle::database::{MonocleDatabase, RibSqliteStore}; +use monocle::lens::rib::{RibLens, RibOutputType}; +use monocle::utils::{OutputFormat, TimestampFormat}; +use monocle::MonocleConfig; + +use super::elem_format::{format_elem, format_elems_table, get_header}; + +pub use monocle::lens::rib::RibArgs; + +const DEFAULT_FIELDS_RIB: &[&str] = &[ + "type", + "timestamp", + "peer_ip", + "peer_asn", + "prefix", + "path_id", + "as_path", + "origin_asns", + "origin", + "next_hop", + "local_pref", + "med", + "communities", + "atomic", + "aggr_asn", + "aggr_ip", + "collector", +]; + +pub fn run(config: &MonocleConfig, args: RibArgs, output_format: OutputFormat, no_update: bool) { + if let Err(error) = run_inner(config, args, output_format, no_update) { + eprintln!("ERROR: {}", error); + std::process::exit(1); + } +} + +fn run_inner( + config: &MonocleConfig, + args: RibArgs, + output_format: OutputFormat, + no_update: bool, +) -> Result<()> { + let sqlite_path = config.sqlite_path(); + let db = MonocleDatabase::open(&sqlite_path) + .map_err(|e| anyhow!("Failed to open database '{}': {}", sqlite_path, e))?; + let lens = RibLens::new(&db, config); + + match args.file_output_type() { + None => run_stdout(&lens, &args, output_format, no_update), + Some(RibOutputType::Sqlite) => run_sqlite_output(&lens, &args, no_update), + } +} + +fn run_stdout( + lens: &RibLens<'_>, + args: &RibArgs, + output_format: OutputFormat, + no_update: bool, +) -> Result<()> { + let mut stdout = std::io::stdout(); + + if output_format == OutputFormat::Table { + let mut elems = Vec::<(BgpElem, Option)>::new(); + lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { + state_store.visit_entries(|entry| { + elems.push((entry.elem, Some(entry.collector))); + Ok(()) + }) + })?; + + if !elems.is_empty() { + writeln!( + stdout, + "{}", + format_elems_table(&elems, DEFAULT_FIELDS_RIB, TimestampFormat::Unix) + ) + .map_err(|e| anyhow!("Failed to write table output: {}", e))?; + } + return Ok(()); + } + + let mut header_written = false; + lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { + if !header_written { + if let Some(header) = get_header(output_format, DEFAULT_FIELDS_RIB) { + writeln!(stdout, "{}", header) + .map_err(|e| anyhow!("Failed to write output header: {}", e))?; + } + header_written = true; + } + + state_store.visit_entries(|entry| { + if let Some(line) = format_elem( + &entry.elem, + output_format, + DEFAULT_FIELDS_RIB, + Some(entry.collector.as_str()), + TimestampFormat::Unix, + ) { + writeln!(stdout, "{}", line) + .map_err(|e| anyhow!("Failed to write reconstructed RIB row: {}", e))?; + } + Ok(()) + }) + })?; + + Ok(()) +} + +fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Result<()> { + let normalized_ts = args.validate()?; + let output_dir = ensure_output_dir(lens.output_directory(args)?)?; + let output_path = output_dir.join(format!( + "{}.sqlite3", + lens.file_name_prefix(args, &normalized_ts)? + )); + + remove_existing_file(&output_path)?; + + let sqlite_store = RibSqliteStore::new(path_to_str(&output_path)?, true)?; + let summary = lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store| { + state_store.visit_entries(|entry| sqlite_store.insert_entry(rib_ts, &entry)) + })?; + + eprintln!( + "wrote {} reconstructed RIB snapshot(s) to {}", + summary.rib_ts.len(), + output_path.display() + ); + Ok(()) +} + +fn ensure_output_dir(path: Option) -> Result { + let output_dir = path.ok_or_else(|| anyhow!("Failed to resolve output directory"))?; + fs::create_dir_all(&output_dir).map_err(|e| { + anyhow!( + "Failed to create output directory '{}': {}", + output_dir.display(), + e + ) + })?; + Ok(output_dir) +} + +fn remove_existing_file(path: &Path) -> Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(error) => Err(anyhow!( + "Failed to remove existing output file '{}': {}", + path.display(), + error + )), + } +} + +fn path_to_str(path: &Path) -> Result<&str> { + path.to_str() + .ok_or_else(|| anyhow!("Path '{}' contains invalid UTF-8", path.display())) +} diff --git a/src/bin/monocle.rs b/src/bin/monocle.rs index 20dac06..c5cf360 100644 --- a/src/bin/monocle.rs +++ b/src/bin/monocle.rs @@ -17,6 +17,7 @@ use commands::inspect::InspectArgs; use commands::ip::IpArgs; use commands::parse::ParseArgs; use commands::pfx2as::Pfx2asArgs; +use commands::rib::RibArgs; use commands::rpki::RpkiCommands; use commands::search::SearchArgs; use commands::time::TimeArgs; @@ -57,6 +58,9 @@ enum Commands { /// Search BGP messages from all available public MRT files. Search(SearchArgs), + /// Reconstruct final RIB state at one or more arbitrary timestamps. + Rib(RibArgs), + /// Start the WebSocket server (ws://
:/ws, health: http://
:/health) /// /// Note: This requires building with the `server` feature enabled. @@ -176,6 +180,9 @@ fn main() { match cli.command { Commands::Parse(args) => commands::parse::run(args, streaming_output_format), Commands::Search(args) => commands::search::run(&config, args, streaming_output_format), + Commands::Rib(args) => { + commands::rib::run(&config, args, streaming_output_format, cli.no_update) + } Commands::Server(args) => { // The server requires the `server` feature (axum + tokio). Keep the CLI diff --git a/src/database/mod.rs b/src/database/mod.rs index ed01451..8825d19 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -125,6 +125,11 @@ pub use monocle::{ // Requires lib feature because MsgStore depends on bgpkit_parser::BgpElem #[cfg(feature = "lib")] pub use session::MsgStore; +#[cfg(feature = "lib")] +pub use session::{ + elem_matches_stored_json, path_id_for_key, path_id_from_key, RibRouteKey, RibSqliteStore, + RibStateStore, StoredRibEntry, +}; // ============================================================================= // Helper function diff --git a/src/database/session/mod.rs b/src/database/session/mod.rs index d69b267..f62c246 100644 --- a/src/database/session/mod.rs +++ b/src/database/session/mod.rs @@ -15,6 +15,13 @@ #[cfg(feature = "lib")] mod msg_store; +#[cfg(feature = "lib")] +mod rib_store; #[cfg(feature = "lib")] pub use msg_store::MsgStore; +#[cfg(feature = "lib")] +pub use rib_store::{ + elem_matches_stored_json, path_id_for_key, path_id_from_key, RibRouteKey, RibSqliteStore, + RibStateStore, StoredRibEntry, +}; diff --git a/src/database/session/rib_store.rs b/src/database/session/rib_store.rs new file mode 100644 index 0000000..ec11255 --- /dev/null +++ b/src/database/session/rib_store.rs @@ -0,0 +1,505 @@ +//! Session-based SQLite stores for reconstructed RIB snapshots. +//! +//! These stores are separate from `MsgStore` because RIB reconstruction needs: +//! - route-identity keys with `path_id` +//! - exact `BgpElem` round-tripping for reconstructed RIB state +//! - merged SQLite output keyed by `rib_ts` + +use anyhow::{anyhow, Result}; +use bgpkit_parser::BgpElem; +use rusqlite::{params, OptionalExtension}; +use serde_json::Value; +use tempfile::{NamedTempFile, TempPath}; + +use crate::database::core::DatabaseConn; + +fn opt_to_sql_i64(v: Option) -> i64 { + v.map(i64::from).unwrap_or(-1) +} + +fn sql_i64_to_opt(v: i64) -> Option { + if v < 0 { + None + } else { + u32::try_from(v).ok() + } +} + +fn elem_as_path(elem: &BgpElem) -> Option { + elem.as_path.as_ref().map(|path| path.to_string()) +} + +fn elem_origin_asns(elem: &BgpElem) -> Option { + elem.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }) +} + +fn elem_next_hop(elem: &BgpElem) -> Option { + elem.next_hop.as_ref().map(|hop| hop.to_string()) +} + +fn elem_communities(elem: &BgpElem) -> Option { + elem.communities.as_ref().map(|communities| { + communities + .iter() + .map(|community| community.to_string()) + .collect::>() + .join(" ") + }) +} + +fn elem_origin(elem: &BgpElem) -> Option { + elem.origin.as_ref().map(|origin| origin.to_string()) +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RibRouteKey { + pub collector: String, + pub peer_ip: String, + pub peer_asn: u32, + pub prefix: String, + pub path_id: Option, +} + +impl RibRouteKey { + pub fn from_elem(collector: &str, elem: &BgpElem) -> Self { + Self { + collector: collector.to_string(), + peer_ip: elem.peer_ip.to_string(), + peer_asn: elem.peer_asn.to_u32(), + prefix: elem.prefix.prefix.to_string(), + path_id: elem.prefix.path_id, + } + } +} + +#[derive(Debug, Clone)] +pub struct StoredRibEntry { + pub collector: String, + pub elem: BgpElem, +} + +impl StoredRibEntry { + pub fn new(collector: impl Into, elem: BgpElem) -> Self { + Self { + collector: collector.into(), + elem, + } + } + + pub fn route_key(&self) -> RibRouteKey { + RibRouteKey::from_elem(&self.collector, &self.elem) + } + + fn elem_json(&self) -> Result { + serde_json::to_string(&self.elem) + .map_err(|e| anyhow!("Failed to serialize BgpElem for SQLite storage: {}", e)) + } + + fn from_row(row: &rusqlite::Row<'_>) -> Result { + let collector: String = row + .get("collector") + .map_err(|e| anyhow!("Failed to read collector column: {}", e))?; + let elem_json: String = row + .get("elem_json") + .map_err(|e| anyhow!("Failed to read elem_json column: {}", e))?; + let elem = serde_json::from_str::(&elem_json) + .map_err(|e| anyhow!("Failed to deserialize stored BgpElem JSON: {}", e))?; + Ok(Self { collector, elem }) + } +} + +pub struct RibStateStore { + db: DatabaseConn, + _temp_path: Option, +} + +impl RibStateStore { + pub fn new(db_path: Option<&str>, reset: bool) -> Result { + let db = DatabaseConn::open(db_path)?; + let store = Self { + db, + _temp_path: None, + }; + store.initialize(reset)?; + Ok(store) + } + + pub fn new_temp() -> Result { + let file = NamedTempFile::new().map_err(|e| { + anyhow!( + "Failed to create temporary SQLite path for rib state: {}", + e + ) + })?; + let temp_path = file.into_temp_path(); + let db = DatabaseConn::open_path( + temp_path + .to_str() + .ok_or_else(|| anyhow!("Temporary rib state path contains invalid UTF-8"))?, + )?; + let store = Self { + db, + _temp_path: Some(temp_path), + }; + store.initialize(true)?; + Ok(store) + } + + fn initialize(&self, reset: bool) -> Result<()> { + if reset { + self.db + .conn + .execute("DROP TABLE IF EXISTS rib_state", []) + .map_err(|e| anyhow!("Failed to drop rib_state table: {}", e))?; + } + + self.db + .conn + .execute_batch( + r#" + CREATE TABLE IF NOT EXISTS rib_state ( + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER NOT NULL, + timestamp REAL NOT NULL, + as_path TEXT, + origin_asns TEXT, + origin TEXT, + next_hop TEXT, + local_pref INTEGER, + med INTEGER, + communities TEXT, + atomic INTEGER NOT NULL, + aggr_asn INTEGER, + aggr_ip TEXT, + elem_json TEXT NOT NULL, + PRIMARY KEY (collector, peer_ip, peer_asn, prefix, path_id) + ); + CREATE INDEX IF NOT EXISTS idx_rib_state_collector ON rib_state(collector); + CREATE INDEX IF NOT EXISTS idx_rib_state_peer_asn ON rib_state(peer_asn); + CREATE INDEX IF NOT EXISTS idx_rib_state_prefix ON rib_state(prefix); + "#, + ) + .map_err(|e| anyhow!("Failed to initialize rib_state schema: {}", e))?; + Ok(()) + } + + pub fn count(&self) -> Result { + self.db.table_count("rib_state") + } + + pub fn route_exists(&self, key: &RibRouteKey) -> Result { + let exists = self + .db + .conn + .query_row( + "SELECT 1 FROM rib_state WHERE collector = ?1 AND peer_ip = ?2 AND peer_asn = ?3 AND prefix = ?4 AND path_id = ?5", + params![ + key.collector, + key.peer_ip, + key.peer_asn, + key.prefix, + opt_to_sql_i64(key.path_id), + ], + |_| Ok(()), + ) + .optional() + .map_err(|e| anyhow!("Failed to test route existence in rib_state: {}", e))?; + Ok(exists.is_some()) + } + + pub fn upsert_entry(&self, entry: &StoredRibEntry) -> Result<()> { + self.upsert_entries(std::slice::from_ref(entry)) + } + + pub fn upsert_entries(&self, entries: &[StoredRibEntry]) -> Result<()> { + if entries.is_empty() { + return Ok(()); + } + + let tx = self + .db + .conn + .unchecked_transaction() + .map_err(|e| anyhow!("Failed to begin rib_state transaction: {}", e))?; + let mut stmt = tx + .prepare_cached( + r#" + INSERT OR REPLACE INTO rib_state ( + collector, peer_ip, peer_asn, prefix, path_id, timestamp, + as_path, origin_asns, origin, next_hop, local_pref, med, + communities, atomic, aggr_asn, aggr_ip, elem_json + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) + "#, + ) + .map_err(|e| anyhow!("Failed to prepare rib_state upsert statement: {}", e))?; + + for entry in entries { + stmt.execute(params![ + entry.collector, + entry.elem.peer_ip.to_string(), + entry.elem.peer_asn.to_u32(), + entry.elem.prefix.prefix.to_string(), + opt_to_sql_i64(entry.elem.prefix.path_id), + entry.elem.timestamp, + elem_as_path(&entry.elem), + elem_origin_asns(&entry.elem), + elem_origin(&entry.elem), + elem_next_hop(&entry.elem), + entry.elem.local_pref, + entry.elem.med, + elem_communities(&entry.elem), + if entry.elem.atomic { 1_i64 } else { 0_i64 }, + entry.elem.aggr_asn.map(|asn| asn.to_u32()), + entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), + entry.elem_json()?, + ]) + .map_err(|e| anyhow!("Failed to upsert entry into rib_state: {}", e))?; + } + + drop(stmt); + tx.commit() + .map_err(|e| anyhow!("Failed to commit rib_state upserts: {}", e))?; + Ok(()) + } + + pub fn delete_key(&self, key: &RibRouteKey) -> Result<()> { + self.delete_keys(std::slice::from_ref(key)) + } + + pub fn delete_keys(&self, keys: &[RibRouteKey]) -> Result<()> { + if keys.is_empty() { + return Ok(()); + } + + let tx = self + .db + .conn + .unchecked_transaction() + .map_err(|e| anyhow!("Failed to begin rib_state delete transaction: {}", e))?; + let mut stmt = tx + .prepare_cached( + "DELETE FROM rib_state WHERE collector = ?1 AND peer_ip = ?2 AND peer_asn = ?3 AND prefix = ?4 AND path_id = ?5", + ) + .map_err(|e| anyhow!("Failed to prepare rib_state delete statement: {}", e))?; + + for key in keys { + stmt.execute(params![ + key.collector, + key.peer_ip, + key.peer_asn, + key.prefix, + opt_to_sql_i64(key.path_id), + ]) + .map_err(|e| anyhow!("Failed to delete entry from rib_state: {}", e))?; + } + + drop(stmt); + tx.commit() + .map_err(|e| anyhow!("Failed to commit rib_state deletes: {}", e))?; + Ok(()) + } + + pub fn visit_entries(&self, mut visitor: F) -> Result<()> + where + F: FnMut(StoredRibEntry) -> Result<()>, + { + let mut stmt = self + .db + .conn + .prepare( + "SELECT collector, elem_json FROM rib_state ORDER BY collector, peer_asn, peer_ip, prefix, path_id", + ) + .map_err(|e| anyhow!("Failed to prepare rib_state scan statement: {}", e))?; + + let mut rows = stmt + .query([]) + .map_err(|e| anyhow!("Failed to query rib_state rows: {}", e))?; + + while let Some(row) = rows + .next() + .map_err(|e| anyhow!("Failed to iterate rib_state rows: {}", e))? + { + visitor(StoredRibEntry::from_row(row)?)?; + } + + Ok(()) + } +} + +pub struct RibSqliteStore { + db: DatabaseConn, +} + +impl RibSqliteStore { + pub fn new(db_path: &str, reset: bool) -> Result { + let db = DatabaseConn::open_path(db_path)?; + let store = Self { db }; + store.initialize(reset)?; + Ok(store) + } + + fn initialize(&self, reset: bool) -> Result<()> { + if reset { + self.db + .conn + .execute("DROP TABLE IF EXISTS elems", []) + .map_err(|e| anyhow!("Failed to drop existing rib output elems table: {}", e))?; + } + + self.db + .conn + .execute_batch( + r#" + CREATE TABLE IF NOT EXISTS elems ( + rib_ts INTEGER NOT NULL, + timestamp REAL NOT NULL, + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER NOT NULL, + as_path TEXT, + origin_asns TEXT, + origin TEXT, + next_hop TEXT, + local_pref INTEGER, + med INTEGER, + communities TEXT, + atomic INTEGER NOT NULL, + aggr_asn INTEGER, + aggr_ip TEXT + ); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts ON elems(rib_ts); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_prefix ON elems(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_peer_asn ON elems(rib_ts, peer_asn); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_collector ON elems(rib_ts, collector); + "#, + ) + .map_err(|e| anyhow!("Failed to initialize rib output SQLite schema: {}", e))?; + Ok(()) + } + + pub fn insert_entry(&self, rib_ts: i64, entry: &StoredRibEntry) -> Result<()> { + self.db + .conn + .execute( + r#" + INSERT INTO elems ( + rib_ts, timestamp, collector, peer_ip, peer_asn, prefix, path_id, + as_path, origin_asns, origin, next_hop, local_pref, med, + communities, atomic, aggr_asn, aggr_ip + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) + "#, + params![ + rib_ts, + entry.elem.timestamp, + entry.collector, + entry.elem.peer_ip.to_string(), + entry.elem.peer_asn.to_u32(), + entry.elem.prefix.prefix.to_string(), + opt_to_sql_i64(entry.elem.prefix.path_id), + elem_as_path(&entry.elem), + elem_origin_asns(&entry.elem), + elem_origin(&entry.elem), + elem_next_hop(&entry.elem), + entry.elem.local_pref, + entry.elem.med, + elem_communities(&entry.elem), + if entry.elem.atomic { 1_i64 } else { 0_i64 }, + entry.elem.aggr_asn.map(|asn| asn.to_u32()), + entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), + ], + ) + .map_err(|e| anyhow!("Failed to insert entry into rib output SQLite store: {}", e))?; + Ok(()) + } +} + +pub fn elem_matches_stored_json(elem_json: &str, key: &str) -> Result> { + let value = serde_json::from_str::(elem_json) + .map_err(|e| anyhow!("Failed to deserialize stored elem_json value: {}", e))?; + Ok(value.get(key).cloned()) +} + +pub fn path_id_for_key(path_id: Option) -> i64 { + opt_to_sql_i64(path_id) +} + +pub fn path_id_from_key(path_id: i64) -> Option { + sql_i64_to_opt(path_id) +} + +#[cfg(test)] +mod tests { + use super::*; + use bgpkit_parser::models::{AsPath, AsPathSegment, ElemType, NetworkPrefix}; + use std::net::{IpAddr, Ipv4Addr}; + + fn test_elem() -> Result { + Ok(BgpElem { + timestamp: 1234.0, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)), + peer_asn: 64496.into(), + prefix: NetworkPrefix::new("203.0.113.0/24".parse()?, Some(7)), + next_hop: Some(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 2))), + as_path: Some(AsPath { + segments: vec![AsPathSegment::AsSequence(vec![64496.into(), 64497.into()])], + }), + origin_asns: Some(vec![64497.into()]), + origin: None, + local_pref: Some(100), + med: Some(50), + communities: None, + atomic: false, + aggr_asn: None, + aggr_ip: None, + only_to_customer: None, + unknown: None, + deprecated: None, + }) + } + + #[test] + fn test_rib_state_store_round_trip() -> Result<()> { + let store = RibStateStore::new_temp()?; + let entry = StoredRibEntry::new("rrc00", test_elem()?); + store.upsert_entry(&entry)?; + assert!(store.route_exists(&entry.route_key())?); + + let mut visited = Vec::new(); + store.visit_entries(|entry| { + visited.push(entry); + Ok(()) + })?; + + assert_eq!(visited.len(), 1); + assert_eq!(visited[0].collector, "rrc00"); + assert_eq!(visited[0].elem.prefix.path_id, Some(7)); + Ok(()) + } + + #[test] + fn test_path_id_helpers() { + assert_eq!(path_id_for_key(None), -1); + assert_eq!(path_id_from_key(-1), None); + assert_eq!(path_id_from_key(42), Some(42)); + } + + #[test] + fn test_elem_json_access() -> Result<()> { + let elem = test_elem()?; + let entry = StoredRibEntry::new("rrc00", elem); + let origin_asns = elem_matches_stored_json(&entry.elem_json()?, "origin_asns")?; + assert!(origin_asns.is_some()); + Ok(()) + } +} diff --git a/src/lens/mod.rs b/src/lens/mod.rs index 3deaa96..937b883 100644 --- a/src/lens/mod.rs +++ b/src/lens/mod.rs @@ -78,6 +78,10 @@ pub mod parse; #[cfg(feature = "lib")] pub mod search; +// RibLens - arbitrary timestamp RIB reconstruction +#[cfg(feature = "lib")] +pub mod rib; + // RpkiLens - RPKI validation and data #[cfg(feature = "lib")] pub mod rpki; diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs new file mode 100644 index 0000000..f40b923 --- /dev/null +++ b/src/lens/rib/mod.rs @@ -0,0 +1,1143 @@ +//! RIB reconstruction lens. +//! +//! This module reconstructs final RIB state at arbitrary timestamps by: +//! 1. Selecting the latest RIB before each target time +//! 2. Replaying overlapping updates up to the exact target time +//! 3. Materializing only the final route state for each requested `rib_ts` + +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; + +use anyhow::{anyhow, Result}; +use bgpkit_broker::{BgpkitBroker, BrokerItem}; +use bgpkit_parser::models::ElemType; +use bgpkit_parser::BgpElem; +use chrono::{DateTime, Duration}; +use regex::Regex; +use serde::{Deserialize, Serialize}; + +use crate::config::MonocleConfig; +use crate::database::{MonocleDatabase, RibRouteKey, RibStateStore, StoredRibEntry}; +use crate::lens::country::CountryLens; +use crate::lens::parse::ParseFilters; +use crate::lens::time::TimeLens; + +#[cfg(feature = "cli")] +use clap::{Args, ValueEnum}; + +const FULL_FEED_V4_THRESHOLD: u32 = 800_000; +const FULL_FEED_V6_THRESHOLD: u32 = 100_000; +const RIB_LOOKBACK_HOURS: i64 = 24 * 30; +const UPDATES_LOOKAHEAD_HOURS: i64 = 2; + +type FullFeedAllowlists = HashMap>; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[cfg_attr(feature = "cli", derive(ValueEnum))] +pub enum RibOutputType { + Sqlite, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(Args))] +pub struct RibFilters { + /// Target RIB timestamp. Repeat to request multiple snapshots. + #[cfg_attr(feature = "cli", clap(long = "ts", required = true))] + #[serde(default)] + pub rib_ts: Vec, + + /// Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'o', long, value_delimiter = ','))] + #[serde(default)] + pub origin_asn: Vec, + + /// Filter by origin ASN registration country. + #[cfg_attr(feature = "cli", clap(short = 'C', long))] + pub country: Option, + + /// Filter by network prefix(es), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'p', long, value_delimiter = ','))] + #[serde(default)] + pub prefix: Vec, + + /// Include super-prefixes when filtering. + #[cfg_attr(feature = "cli", clap(short = 's', long))] + #[serde(default)] + pub include_super: bool, + + /// Include sub-prefixes when filtering. + #[cfg_attr(feature = "cli", clap(short = 'S', long))] + #[serde(default)] + pub include_sub: bool, + + /// Filter by peer ASN(s), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'J', long, value_delimiter = ','))] + #[serde(default)] + pub peer_asn: Vec, + + /// Filter by AS path regex string. + #[cfg_attr(feature = "cli", clap(short = 'a', long))] + pub as_path: Option, + + /// Filter by collector, e.g., rrc00 or route-views2. + #[cfg_attr(feature = "cli", clap(short = 'c', long))] + pub collector: Option, + + /// Filter by route collection project, i.e. riperis or routeviews. + #[cfg_attr(feature = "cli", clap(short = 'P', long))] + pub project: Option, + + /// Keep only full-feed peers based on broker peer metadata. + #[cfg_attr(feature = "cli", clap(long))] + #[serde(default)] + pub full_feed_only: bool, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(Args))] +pub struct RibArgs { + #[cfg_attr(feature = "cli", clap(flatten))] + #[serde(flatten)] + pub filters: RibFilters, + + /// File output type. If omitted and `--output-dir` is also omitted, output goes to stdout. + #[cfg_attr(feature = "cli", clap(long, value_enum))] + pub output_type: Option, + + /// Output directory for generated SQLite files. + #[cfg_attr(feature = "cli", clap(long))] + pub output_dir: Option, +} + +impl RibArgs { + pub fn normalized_rib_ts(&self) -> Result> { + let time_lens = TimeLens::new(); + let mut timestamps = BTreeSet::new(); + + for value in &self.filters.rib_ts { + let ts = time_lens + .parse_time_string(value) + .map_err(|e| anyhow!("Invalid --ts value '{}': {}", value, e))? + .timestamp(); + timestamps.insert(ts); + } + + if timestamps.is_empty() { + return Err(anyhow!("At least one --ts value is required")); + } + + Ok(timestamps.into_iter().collect()) + } + + pub fn file_output_type(&self) -> Option { + match (self.output_type, self.output_dir.is_some()) { + (Some(output_type), _) => Some(output_type), + (None, true) => Some(RibOutputType::Sqlite), + (None, false) => None, + } + } + + pub fn validate(&self) -> Result> { + let normalized_ts = self.normalized_rib_ts()?; + + let parse_filters = ParseFilters { + origin_asn: self.filters.origin_asn.clone(), + prefix: self.filters.prefix.clone(), + include_super: self.filters.include_super, + include_sub: self.filters.include_sub, + peer_asn: self.filters.peer_asn.clone(), + as_path: self.filters.as_path.clone(), + ..Default::default() + }; + parse_filters.validate()?; + + if let Some(as_path) = &self.filters.as_path { + Regex::new(as_path) + .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", as_path, e))?; + } + + if normalized_ts.len() > 1 && self.file_output_type().is_none() { + return Err(anyhow!( + "Multiple --ts values require file output. Use --output-type and optionally --output-dir." + )); + } + + Ok(normalized_ts) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RibRow { + pub rib_ts: i64, + pub timestamp: f64, + pub collector: String, + pub peer_ip: String, + pub peer_asn: u32, + pub prefix: String, + pub path_id: Option, + pub as_path: Option, + pub origin_asns: Option, + pub origin: Option, + pub next_hop: Option, + pub local_pref: Option, + pub med: Option, + pub communities: Option, + pub atomic: bool, + pub aggr_asn: Option, + pub aggr_ip: Option, +} + +impl RibRow { + pub fn from_entry(rib_ts: i64, entry: &StoredRibEntry) -> Self { + Self { + rib_ts, + timestamp: entry.elem.timestamp, + collector: entry.collector.clone(), + peer_ip: entry.elem.peer_ip.to_string(), + peer_asn: entry.elem.peer_asn.to_u32(), + prefix: entry.elem.prefix.prefix.to_string(), + path_id: entry.elem.prefix.path_id, + as_path: entry.elem.as_path.as_ref().map(|path| path.to_string()), + origin_asns: entry.elem.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }), + origin: entry.elem.origin.as_ref().map(|origin| origin.to_string()), + next_hop: entry.elem.next_hop.as_ref().map(|hop| hop.to_string()), + local_pref: entry.elem.local_pref, + med: entry.elem.med, + communities: entry.elem.communities.as_ref().map(|communities| { + communities + .iter() + .map(|community| community.to_string()) + .collect::>() + .join(" ") + }), + atomic: entry.elem.atomic, + aggr_asn: entry.elem.aggr_asn.map(|asn| asn.to_u32()), + aggr_ip: entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), + } + } +} + +#[derive(Debug, Clone)] +pub struct RibRunSummary { + pub rib_ts: Vec, + pub collectors_processed: usize, + pub groups_processed: usize, +} + +#[derive(Debug, Clone)] +struct RibReplayGroup { + collector: String, + rib_item: BrokerItem, + rib_ts: Vec, + updates: Vec, +} + +#[derive(Debug, Clone)] +enum DeltaOp { + Upsert(StoredRibEntry), + Delete(RibRouteKey), +} + +#[derive(Debug, Clone)] +struct OriginFilter { + values: HashSet, + negated: bool, +} + +pub struct RibLens<'a> { + db: &'a MonocleDatabase, + config: &'a MonocleConfig, +} + +impl<'a> RibLens<'a> { + pub fn new(db: &'a MonocleDatabase, config: &'a MonocleConfig) -> Self { + Self { db, config } + } + + pub fn reconstruct_snapshots( + &self, + args: &RibArgs, + no_update: bool, + mut snapshot_visitor: F, + ) -> Result + where + F: FnMut(i64, &RibStateStore) -> Result<()>, + { + let normalized_ts = args.validate()?; + let country_asns = self.resolve_country_asns(args.filters.country.as_deref(), no_update)?; + let origin_filter = Self::parse_origin_filter(&args.filters.origin_asn)?; + let as_path_regex = Self::compile_as_path_regex(args.filters.as_path.as_deref())?; + let groups = self.resolve_replay_groups(args, &normalized_ts)?; + + let allowlists = if args.filters.full_feed_only { + self.build_full_feed_allowlists(&groups)? + } else { + HashMap::new() + }; + + for group in &groups { + let state_store = RibStateStore::new_temp()?; + let safe_base_filters = self.safe_parse_filters( + args, + group.rib_item.ts_start.and_utc().timestamp(), + group.rib_item.ts_end.and_utc().timestamp(), + ); + + self.load_base_rib( + &state_store, + &group.collector, + &group.rib_item, + &safe_base_filters, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + )?; + + self.replay_updates( + &state_store, + group, + args, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + &mut snapshot_visitor, + )?; + } + + let collector_count = groups + .iter() + .map(|group| group.collector.as_str()) + .collect::>() + .len(); + + Ok(RibRunSummary { + rib_ts: normalized_ts, + collectors_processed: collector_count, + groups_processed: groups.len(), + }) + } + + pub fn output_directory(&self, args: &RibArgs) -> Result> { + match args.file_output_type() { + None => Ok(None), + Some(_) => { + let dir = match &args.output_dir { + Some(path) => std::path::PathBuf::from(path), + None => std::env::current_dir() + .map_err(|e| anyhow!("Failed to determine current directory: {}", e))?, + }; + Ok(Some(dir)) + } + } + } + + pub fn file_name_prefix(&self, args: &RibArgs, rib_ts: &[i64]) -> Result { + let base = if rib_ts.len() == 1 { + format!( + "monocle-rib-{}", + Self::format_rib_ts_for_filename(rib_ts[0])? + ) + } else { + format!( + "monocle-rib-{}-{}", + Self::format_rib_ts_for_filename( + *rib_ts + .first() + .ok_or_else(|| anyhow!("missing first rib_ts"))? + )?, + Self::format_rib_ts_for_filename( + *rib_ts + .last() + .ok_or_else(|| anyhow!("missing last rib_ts"))? + )?, + ) + }; + + let slug = self.filter_slug(&args.filters)?; + if slug.is_empty() { + Ok(base) + } else { + Ok(format!("{}-{}", base, slug)) + } + } + + pub fn single_snapshot_file_name( + &self, + args: &RibArgs, + rib_ts: i64, + output_type: RibOutputType, + ) -> Result { + let prefix = self.file_name_prefix(args, &[rib_ts])?; + let ext = match output_type { + RibOutputType::Sqlite => "sqlite3", + }; + Ok(format!("{}.{}", prefix, ext)) + } + + fn resolve_country_asns( + &self, + country: Option<&str>, + no_update: bool, + ) -> Result>> { + let Some(country) = country else { + return Ok(None); + }; + + let country_code = self.resolve_country_code(country)?; + let asinfo = self.db.asinfo(); + + if asinfo.is_empty() { + if no_update { + return Err(anyhow!( + "ASInfo data is empty but --country was requested. Re-run without --no-update or refresh ASInfo first." + )); + } + self.db + .refresh_asinfo() + .map_err(|e| anyhow!("Failed to refresh ASInfo data for country filter: {}", e))?; + } else if !no_update && asinfo.needs_refresh(self.config.asinfo_cache_ttl()) { + self.db.refresh_asinfo().map_err(|e| { + anyhow!( + "Failed to refresh stale ASInfo data for country filter: {}", + e + ) + })?; + } + + let mut asns = HashSet::new(); + let mut stmt = self + .db + .connection() + .prepare("SELECT asn FROM asinfo_core WHERE UPPER(country) = UPPER(?1) ORDER BY asn") + .map_err(|e| anyhow!("Failed to prepare ASInfo country lookup: {}", e))?; + let rows = stmt + .query_map([country_code.clone()], |row| row.get::<_, u32>(0)) + .map_err(|e| { + anyhow!( + "Failed to query ASInfo by country '{}': {}", + country_code, + e + ) + })?; + + for row in rows { + asns.insert(row.map_err(|e| anyhow!("Failed to decode ASInfo country row: {}", e))?); + } + + Ok(Some(asns)) + } + + fn resolve_country_code(&self, input: &str) -> Result { + let lens = CountryLens::new(); + let matches = lens.lookup(input); + + if matches.is_empty() { + if input.len() == 2 { + return Ok(input.to_uppercase()); + } + return Err(anyhow!("Unknown country filter '{}'", input)); + } + + let exact_name_matches: Vec<_> = matches + .iter() + .filter(|entry| entry.name.eq_ignore_ascii_case(input)) + .collect(); + if exact_name_matches.len() == 1 { + return Ok(exact_name_matches[0].code.clone()); + } + + let exact_code_matches: Vec<_> = matches + .iter() + .filter(|entry| entry.code.eq_ignore_ascii_case(input)) + .collect(); + if exact_code_matches.len() == 1 { + return Ok(exact_code_matches[0].code.clone()); + } + + if matches.len() == 1 { + return Ok(matches[0].code.clone()); + } + + Err(anyhow!( + "Country filter '{}' is ambiguous; matches: {}", + input, + matches + .iter() + .map(|entry| format!("{} ({})", entry.name, entry.code)) + .collect::>() + .join(", ") + )) + } + + fn parse_origin_filter(values: &[String]) -> Result> { + if values.is_empty() { + return Ok(None); + } + + let negated = values + .first() + .map(|value| value.starts_with('!')) + .unwrap_or(false); + let mut parsed = HashSet::new(); + + for value in values { + let asn = value + .trim_start_matches('!') + .parse::() + .map_err(|e| anyhow!("Invalid origin ASN filter '{}': {}", value, e))?; + parsed.insert(asn); + } + + Ok(Some(OriginFilter { + values: parsed, + negated, + })) + } + + fn compile_as_path_regex(pattern: Option<&str>) -> Result> { + pattern + .map(|pattern| { + Regex::new(pattern) + .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", pattern, e)) + }) + .transpose() + } + + fn resolve_replay_groups( + &self, + args: &RibArgs, + normalized_ts: &[i64], + ) -> Result> { + let first_ts = *normalized_ts + .first() + .ok_or_else(|| anyhow!("Missing earliest rib_ts after validation"))?; + let last_ts = *normalized_ts + .last() + .ok_or_else(|| anyhow!("Missing latest rib_ts after validation"))?; + + let ribs = self + .base_broker(args) + .data_type("rib") + .ts_start(Self::timestamp_to_broker_string( + first_ts - Duration::hours(RIB_LOOKBACK_HOURS).num_seconds(), + )?) + .ts_end(Self::timestamp_to_broker_string(last_ts)?) + .query() + .map_err(|e| anyhow!("Failed to query broker for candidate RIB files: {}", e))?; + + let mut ribs_by_collector: BTreeMap> = BTreeMap::new(); + for item in ribs { + ribs_by_collector + .entry(item.collector_id.clone()) + .or_default() + .push(item); + } + + let mut groups = Vec::new(); + for (collector, mut collector_ribs) in ribs_by_collector { + collector_ribs.sort_by_key(|item| item.ts_start); + + let mut timestamps_by_rib: BTreeMap)> = BTreeMap::new(); + for rib_ts in normalized_ts { + let selected_rib = collector_ribs + .iter() + .filter(|item| item.ts_start.and_utc().timestamp() <= *rib_ts) + .max_by_key(|item| item.ts_start); + + let Some(selected_rib) = selected_rib else { + return Err(anyhow!( + "No RIB file found at or before {} for collector {}", + Self::format_rib_ts_for_error(*rib_ts)?, + collector + )); + }; + + timestamps_by_rib + .entry(selected_rib.url.clone()) + .and_modify(|(_, timestamps)| timestamps.push(*rib_ts)) + .or_insert_with(|| (selected_rib.clone(), vec![*rib_ts])); + } + + for (_, (rib_item, mut group_ts)) in timestamps_by_rib { + group_ts.sort_unstable(); + let group_max_ts = *group_ts + .last() + .ok_or_else(|| anyhow!("Replay group was created without any rib_ts"))?; + let updates = + self.resolve_group_updates(args, &collector, &rib_item, group_max_ts)?; + + groups.push(RibReplayGroup { + collector: collector.clone(), + rib_item, + rib_ts: group_ts, + updates, + }); + } + } + + groups.sort_by(|a, b| { + a.collector + .cmp(&b.collector) + .then(a.rib_item.ts_start.cmp(&b.rib_item.ts_start)) + }); + + if groups.is_empty() { + return Err(anyhow!( + "No suitable RIB files were found for the requested timestamps and collector filters." + )); + } + + Ok(groups) + } + + fn resolve_group_updates( + &self, + args: &RibArgs, + collector: &str, + rib_item: &BrokerItem, + group_max_ts: i64, + ) -> Result> { + let rib_ts = rib_item.ts_start.and_utc().timestamp(); + let query_end = group_max_ts + Duration::hours(UPDATES_LOOKAHEAD_HOURS).num_seconds(); + + let mut broker = self + .base_broker(args) + .collector_id(collector) + .data_type("updates") + .ts_start(Self::timestamp_to_broker_string(rib_ts)?) + .ts_end(Self::timestamp_to_broker_string(query_end)?); + + if let Some(project) = &args.filters.project { + broker = broker.project(project); + } + + let mut updates = broker.query().map_err(|e| { + anyhow!( + "Failed to query broker for updates for {}: {}", + collector, + e + ) + })?; + + updates.retain(|item| { + let item_start = item.ts_start.and_utc().timestamp(); + let item_end = item.ts_end.and_utc().timestamp(); + item_start <= group_max_ts && item_end > rib_ts + }); + updates.sort_by_key(|item| item.ts_start); + Ok(updates) + } + + fn build_full_feed_allowlists(&self, groups: &[RibReplayGroup]) -> Result { + let mut allowlists = HashMap::new(); + + for collector in groups + .iter() + .map(|group| group.collector.as_str()) + .collect::>() + { + let peers = BgpkitBroker::new() + .collector_id(collector) + .get_peers() + .map_err(|e| { + anyhow!( + "Failed to fetch broker peer metadata for {}: {}", + collector, + e + ) + })?; + + let allowed = peers + .into_iter() + .filter(|peer| { + peer.num_v4_pfxs >= FULL_FEED_V4_THRESHOLD + || peer.num_v6_pfxs >= FULL_FEED_V6_THRESHOLD + }) + .map(|peer| (peer.ip.to_string(), peer.asn)) + .collect::>(); + + allowlists.insert(collector.to_string(), allowed); + } + + Ok(allowlists) + } + + #[allow(clippy::too_many_arguments)] + fn load_base_rib( + &self, + state_store: &RibStateStore, + collector: &str, + rib_item: &BrokerItem, + safe_filters: &ParseFilters, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> Result<()> { + let parser = safe_filters.to_parser(&rib_item.url).map_err(|e| { + anyhow!( + "Failed to build parser for base RIB {}: {}", + rib_item.url, + e + ) + })?; + + let mut batch = Vec::new(); + for elem in parser { + if elem.elem_type != ElemType::ANNOUNCE { + continue; + } + if self.announce_matches( + collector, + &elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + ) { + batch.push(StoredRibEntry::new(collector.to_string(), elem)); + } + } + + state_store.upsert_entries(&batch)?; + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn replay_updates( + &self, + state_store: &RibStateStore, + group: &RibReplayGroup, + args: &RibArgs, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + snapshot_visitor: &mut F, + ) -> Result<()> + where + F: FnMut(i64, &RibStateStore) -> Result<()>, + { + let mut pending = HashMap::::new(); + let mut next_snapshot_index = 0usize; + + for update in &group.updates { + let safe_filters = self.safe_parse_filters( + args, + group.rib_item.ts_start.and_utc().timestamp(), + *group + .rib_ts + .last() + .ok_or_else(|| anyhow!("Replay group missing max rib_ts"))?, + ); + let parser = safe_filters.to_parser(&update.url).map_err(|e| { + anyhow!( + "Failed to build parser for updates file {}: {}", + update.url, + e + ) + })?; + + for elem in parser { + while next_snapshot_index < group.rib_ts.len() + && elem.timestamp > group.rib_ts[next_snapshot_index] as f64 + { + self.flush_pending(state_store, &mut pending)?; + snapshot_visitor(group.rib_ts[next_snapshot_index], state_store)?; + next_snapshot_index += 1; + } + + self.apply_update_to_delta( + &mut pending, + state_store, + &group.collector, + elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + )?; + } + } + + while next_snapshot_index < group.rib_ts.len() { + self.flush_pending(state_store, &mut pending)?; + snapshot_visitor(group.rib_ts[next_snapshot_index], state_store)?; + next_snapshot_index += 1; + } + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn apply_update_to_delta( + &self, + pending: &mut HashMap, + state_store: &RibStateStore, + collector: &str, + elem: BgpElem, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> Result<()> { + let route_key = RibRouteKey::from_elem(collector, &elem); + + match elem.elem_type { + ElemType::WITHDRAW => { + if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { + pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + } + } + ElemType::ANNOUNCE => { + let matches = self.announce_matches( + collector, + &elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + ); + + if matches { + pending.insert( + route_key, + DeltaOp::Upsert(StoredRibEntry::new(collector.to_string(), elem)), + ); + } else if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { + pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + } + } + } + + Ok(()) + } + + fn route_exists_in_state_or_delta( + &self, + route_key: &RibRouteKey, + state_store: &RibStateStore, + pending: &HashMap, + ) -> Result { + if let Some(delta) = pending.get(route_key) { + return Ok(matches!(delta, DeltaOp::Upsert(_))); + } + state_store.route_exists(route_key) + } + + fn flush_pending( + &self, + state_store: &RibStateStore, + pending: &mut HashMap, + ) -> Result<()> { + if pending.is_empty() { + return Ok(()); + } + + let mut upserts = Vec::new(); + let mut deletes = Vec::new(); + + for delta in pending.values() { + match delta { + DeltaOp::Upsert(entry) => upserts.push(entry.clone()), + DeltaOp::Delete(key) => deletes.push(key.clone()), + } + } + + if !upserts.is_empty() { + state_store.upsert_entries(&upserts)?; + } + if !deletes.is_empty() { + state_store.delete_keys(&deletes)?; + } + + pending.clear(); + Ok(()) + } + + fn announce_matches( + &self, + collector: &str, + elem: &BgpElem, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> bool { + if collector.is_empty() { + return false; + } + + if let Some(origin_filter) = origin_filter { + let matches_origin = elem + .origin_asns + .as_ref() + .map(|origins| { + origins + .iter() + .any(|asn| origin_filter.values.contains(&asn.to_u32())) + }) + .unwrap_or(false); + + if origin_filter.negated { + if matches_origin { + return false; + } + } else if !matches_origin { + return false; + } + } + + if let Some(country_asns) = country_asns { + let matches_country = elem + .origin_asns + .as_ref() + .map(|origins| { + origins + .iter() + .any(|asn| country_asns.contains(&asn.to_u32())) + }) + .unwrap_or(false); + if !matches_country { + return false; + } + } + + if let Some(as_path_regex) = as_path_regex { + let as_path = elem + .as_path + .as_ref() + .map(|path| path.to_string()) + .unwrap_or_default(); + if !as_path_regex.is_match(&as_path) { + return false; + } + } + + if let Some(full_feed_allowlist) = full_feed_allowlist { + let peer_key = (elem.peer_ip.to_string(), elem.peer_asn.to_u32()); + if !full_feed_allowlist.contains(&peer_key) { + return false; + } + } + + true + } + + fn safe_parse_filters(&self, args: &RibArgs, start_ts: i64, end_ts: i64) -> ParseFilters { + ParseFilters { + prefix: args.filters.prefix.clone(), + include_super: args.filters.include_super, + include_sub: args.filters.include_sub, + peer_asn: args.filters.peer_asn.clone(), + start_ts: Some(start_ts.to_string()), + end_ts: Some(end_ts.to_string()), + ..Default::default() + } + } + + fn base_broker(&self, args: &RibArgs) -> BgpkitBroker { + let mut broker = BgpkitBroker::new().page_size(1000); + if let Some(collector) = &args.filters.collector { + broker = broker.collector_id(collector); + } + if let Some(project) = &args.filters.project { + broker = broker.project(project); + } + broker + } + + fn timestamp_to_broker_string(ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for broker query", ts))?; + Ok(timestamp.format("%Y-%m-%dT%H:%M:%SZ").to_string()) + } + + fn format_rib_ts_for_filename(rib_ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(rib_ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for file naming", rib_ts))?; + Ok(timestamp.format("%Y%m%dT%H%M%SZ").to_string()) + } + + fn format_rib_ts_for_error(rib_ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(rib_ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for error reporting", rib_ts))?; + Ok(timestamp.format("%Y-%m-%dT%H:%M:%SZ").to_string()) + } + + fn filter_slug(&self, filters: &RibFilters) -> Result { + let mut parts = Vec::new(); + + if let Some(country) = &filters.country { + parts.push(format!( + "country-{}", + Self::sanitize_slug_component(country) + )); + } + if !filters.origin_asn.is_empty() { + parts.push(format!( + "origin-{}", + Self::sanitize_list_component(&filters.origin_asn) + )); + } + if !filters.peer_asn.is_empty() { + parts.push(format!( + "peer-{}", + Self::sanitize_list_component(&filters.peer_asn) + )); + } + if let Some(collector) = &filters.collector { + let values = collector + .split(',') + .map(|value| value.trim().to_string()) + .collect::>(); + parts.push(format!( + "collector-{}", + Self::sanitize_list_component(&values) + )); + } + if let Some(project) = &filters.project { + parts.push(format!( + "project-{}", + Self::sanitize_slug_component(project) + )); + } + if !filters.prefix.is_empty() { + parts.push(format!("prefix-{}", Self::hash8(&filters.prefix.join(",")))); + } + if let Some(as_path) = &filters.as_path { + parts.push(format!("aspath-{}", Self::hash8(as_path))); + } + if filters.full_feed_only { + parts.push("fullfeed".to_string()); + } + + let slug = parts.join("-"); + if slug.len() <= 96 { + return Ok(slug); + } + + let truncated = slug + .chars() + .take(80) + .collect::() + .trim_end_matches('-') + .to_string(); + Ok(format!("{}-h{}", truncated, Self::hash8(&slug))) + } + + fn sanitize_list_component(values: &[String]) -> String { + let mut normalized = values + .iter() + .map(|value| Self::sanitize_slug_component(value)) + .collect::>(); + normalized.sort(); + normalized.join("+") + } + + fn sanitize_slug_component(input: &str) -> String { + input + .to_ascii_lowercase() + .chars() + .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' }) + .collect::() + .trim_matches('_') + .to_string() + } + + fn hash8(input: &str) -> String { + let mut hash = 0xcbf29ce484222325_u64; + for byte in input.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + format!("{:08x}", hash & 0xffff_ffff) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn base_args() -> RibArgs { + RibArgs { + filters: RibFilters { + rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], + ..Default::default() + }, + output_type: None, + output_dir: None, + } + } + + #[test] + fn test_validate_multi_ts_stdout_error() { + let mut args = base_args(); + args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + assert!(args.validate().is_err()); + } + + #[test] + fn test_validate_multi_ts_file_output_ok() -> Result<()> { + let mut args = base_args(); + args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + args.output_type = Some(RibOutputType::Sqlite); + let values = args.validate()?; + assert_eq!(values.len(), 2); + Ok(()) + } + + #[test] + fn test_filter_slug_order() -> Result<()> { + let mut args = base_args(); + args.filters.country = Some("IR".to_string()); + args.filters.origin_asn = vec!["15169".to_string(), "13335".to_string()]; + args.filters.peer_asn = vec!["2914".to_string()]; + args.filters.collector = Some("rrc00,route-views2".to_string()); + args.filters.project = Some("riperis".to_string()); + args.filters.prefix = vec!["1.1.1.0/24".to_string()]; + args.filters.as_path = Some("^15169 ".to_string()); + args.filters.full_feed_only = true; + + let db = MonocleDatabase::open_in_memory()?; + let config = MonocleConfig::default(); + let lens = RibLens::new(&db, &config); + let slug = lens.filter_slug(&args.filters)?; + + assert!(slug + .starts_with("country-ir-origin-13335+15169-peer-2914-collector-route_views2+rrc00")); + assert!(slug.contains("-h")); + Ok(()) + } + + #[test] + fn test_hash8_is_stable() { + assert_eq!(RibLens::hash8("a"), RibLens::hash8("a")); + } + + #[test] + fn test_single_snapshot_file_name_includes_filters() -> Result<()> { + let mut args = base_args(); + args.filters.country = Some("US".to_string()); + args.filters.origin_asn = vec!["13335".to_string()]; + args.filters.full_feed_only = true; + + let db = MonocleDatabase::open_in_memory()?; + let config = MonocleConfig::default(); + let lens = RibLens::new(&db, &config); + let file_name = + lens.single_snapshot_file_name(&args, 1_756_728_000, RibOutputType::Sqlite)?; + + assert_eq!( + file_name, + "monocle-rib-20250901T120000Z-country-us-origin-13335-fullfeed.sqlite3" + ); + Ok(()) + } +} From 018906c994d3d4c107c12f866e66597c1c1bee3f Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 15 Mar 2026 12:15:18 -0700 Subject: [PATCH 2/5] fix: infer rib sqlite output from sqlite path - remove the rib output-type parameter and use --sqlite-path to enable SQLite output - require --sqlite-path for multi-timestamp rib runs - update README and changelog for the revised rib CLI --- CHANGELOG.md | 5 ++- README.md | 12 +++---- src/bin/commands/rib.rs | 38 ++++++++-------------- src/lens/rib/mod.rs | 71 ++++++++--------------------------------- 4 files changed, 34 insertions(+), 92 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 861a05c..c2bc973 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,10 @@ All notable changes to this project will be documented in this file. * Added `monocle rib` for reconstructing RIB state at arbitrary timestamps * Selects the latest RIB before each requested `rib_ts` and replays updates to the exact timestamp - * Supports stdout output by default and merged SQLite output - * Repeated `--ts` values are written to one merged SQLite file keyed by `rib_ts` + * Supports stdout output by default and SQLite output via `--sqlite-path` + * Repeated `--ts` values require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts` * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` - * Auto-generated output filenames include requested timestamps and normalized filter slugs ### Code Improvements diff --git a/README.md b/README.md index 946af0d..62eecfd 100644 --- a/README.md +++ b/README.md @@ -730,8 +730,7 @@ Options: -c, --collector Filter by collector, e.g., rrc00 or route-views2 -P, --project Filter by route collection project, i.e. riperis or routeviews --full-feed-only Keep only full-feed peers based on broker peer metadata - --output-type File output type. If omitted and `--output-dir` is also omitted, output goes to stdout [possible values: sqlite] - --output-dir Output directory for generated SQLite files + --sqlite-path SQLite output file path -h, --help Print help -V, --version Print version ``` @@ -739,7 +738,8 @@ Options: Behavior: - A single `--ts` writes to stdout by default. -- Repeated `--ts` values require file output and are written to one merged SQLite file keyed by `rib_ts`. +- Repeated `--ts` values require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts`. +- Providing `--sqlite-path` writes the reconstructed results to that SQLite file instead of stdout. - If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. - `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. @@ -753,13 +753,13 @@ monocle rib --ts 2025-09-01T12:00:00Z -c rrc00 -o 13335 monocle rib \ --ts 2025-09-01T12:00:00Z \ --ts 2025-09-01T18:00:00Z \ - --output-type sqlite \ + --sqlite-path /tmp/rrc00-us.sqlite3 \ -c rrc00 \ --country US \ --full-feed-only -# Override the output directory -monocle rib --ts 2025-09-01T12:00:00Z --output-dir /tmp/rib-out -c route-views2 +# Write a single reconstructed snapshot to SQLite +monocle rib --ts 2025-09-01T12:00:00Z --sqlite-path /tmp/route-views2.sqlite3 -c route-views2 ``` ### `monocle time` diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs index 1aef1e8..b1c2b5b 100644 --- a/src/bin/commands/rib.rs +++ b/src/bin/commands/rib.rs @@ -1,12 +1,12 @@ use std::fs; use std::io::Write; -use std::path::{Path, PathBuf}; +use std::path::Path; use anyhow::{anyhow, Result}; use bgpkit_parser::BgpElem; use monocle::database::{MonocleDatabase, RibSqliteStore}; -use monocle::lens::rib::{RibLens, RibOutputType}; +use monocle::lens::rib::RibLens; use monocle::utils::{OutputFormat, TimestampFormat}; use monocle::MonocleConfig; @@ -52,9 +52,10 @@ fn run_inner( .map_err(|e| anyhow!("Failed to open database '{}': {}", sqlite_path, e))?; let lens = RibLens::new(&db, config); - match args.file_output_type() { - None => run_stdout(&lens, &args, output_format, no_update), - Some(RibOutputType::Sqlite) => run_sqlite_output(&lens, &args, no_update), + if args.sqlite_path.is_some() { + run_sqlite_output(&lens, &args, no_update) + } else { + run_stdout(&lens, &args, output_format, no_update) } } @@ -115,16 +116,15 @@ fn run_stdout( } fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Result<()> { - let normalized_ts = args.validate()?; - let output_dir = ensure_output_dir(lens.output_directory(args)?)?; - let output_path = output_dir.join(format!( - "{}.sqlite3", - lens.file_name_prefix(args, &normalized_ts)? - )); + args.validate()?; + let output_path = args + .sqlite_path + .as_deref() + .ok_or_else(|| anyhow!("Missing --sqlite-path for SQLite output"))?; - remove_existing_file(&output_path)?; + remove_existing_file(output_path)?; - let sqlite_store = RibSqliteStore::new(path_to_str(&output_path)?, true)?; + let sqlite_store = RibSqliteStore::new(path_to_str(output_path)?, true)?; let summary = lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store| { state_store.visit_entries(|entry| sqlite_store.insert_entry(rib_ts, &entry)) })?; @@ -137,18 +137,6 @@ fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Res Ok(()) } -fn ensure_output_dir(path: Option) -> Result { - let output_dir = path.ok_or_else(|| anyhow!("Failed to resolve output directory"))?; - fs::create_dir_all(&output_dir).map_err(|e| { - anyhow!( - "Failed to create output directory '{}': {}", - output_dir.display(), - e - ) - })?; - Ok(output_dir) -} - fn remove_existing_file(path: &Path) -> Result<()> { match fs::remove_file(path) { Ok(()) => Ok(()), diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs index f40b923..11f7b83 100644 --- a/src/lens/rib/mod.rs +++ b/src/lens/rib/mod.rs @@ -6,6 +6,7 @@ //! 3. Materializing only the final route state for each requested `rib_ts` use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::path::PathBuf; use anyhow::{anyhow, Result}; use bgpkit_broker::{BgpkitBroker, BrokerItem}; @@ -22,7 +23,7 @@ use crate::lens::parse::ParseFilters; use crate::lens::time::TimeLens; #[cfg(feature = "cli")] -use clap::{Args, ValueEnum}; +use clap::Args; const FULL_FEED_V4_THRESHOLD: u32 = 800_000; const FULL_FEED_V6_THRESHOLD: u32 = 100_000; @@ -31,12 +32,6 @@ const UPDATES_LOOKAHEAD_HOURS: i64 = 2; type FullFeedAllowlists = HashMap>; -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] -#[cfg_attr(feature = "cli", derive(ValueEnum))] -pub enum RibOutputType { - Sqlite, -} - #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[cfg_attr(feature = "cli", derive(Args))] pub struct RibFilters { @@ -99,13 +94,9 @@ pub struct RibArgs { #[serde(flatten)] pub filters: RibFilters, - /// File output type. If omitted and `--output-dir` is also omitted, output goes to stdout. - #[cfg_attr(feature = "cli", clap(long, value_enum))] - pub output_type: Option, - - /// Output directory for generated SQLite files. + /// SQLite output file path. #[cfg_attr(feature = "cli", clap(long))] - pub output_dir: Option, + pub sqlite_path: Option, } impl RibArgs { @@ -128,14 +119,6 @@ impl RibArgs { Ok(timestamps.into_iter().collect()) } - pub fn file_output_type(&self) -> Option { - match (self.output_type, self.output_dir.is_some()) { - (Some(output_type), _) => Some(output_type), - (None, true) => Some(RibOutputType::Sqlite), - (None, false) => None, - } - } - pub fn validate(&self) -> Result> { let normalized_ts = self.normalized_rib_ts()?; @@ -155,10 +138,8 @@ impl RibArgs { .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", as_path, e))?; } - if normalized_ts.len() > 1 && self.file_output_type().is_none() { - return Err(anyhow!( - "Multiple --ts values require file output. Use --output-type and optionally --output-dir." - )); + if normalized_ts.len() > 1 && self.sqlite_path.is_none() { + return Err(anyhow!("Multiple --ts values require --sqlite-path.")); } Ok(normalized_ts) @@ -323,20 +304,6 @@ impl<'a> RibLens<'a> { }) } - pub fn output_directory(&self, args: &RibArgs) -> Result> { - match args.file_output_type() { - None => Ok(None), - Some(_) => { - let dir = match &args.output_dir { - Some(path) => std::path::PathBuf::from(path), - None => std::env::current_dir() - .map_err(|e| anyhow!("Failed to determine current directory: {}", e))?, - }; - Ok(Some(dir)) - } - } - } - pub fn file_name_prefix(&self, args: &RibArgs, rib_ts: &[i64]) -> Result { let base = if rib_ts.len() == 1 { format!( @@ -367,19 +334,6 @@ impl<'a> RibLens<'a> { } } - pub fn single_snapshot_file_name( - &self, - args: &RibArgs, - rib_ts: i64, - output_type: RibOutputType, - ) -> Result { - let prefix = self.file_name_prefix(args, &[rib_ts])?; - let ext = match output_type { - RibOutputType::Sqlite => "sqlite3", - }; - Ok(format!("{}.{}", prefix, ext)) - } - fn resolve_country_asns( &self, country: Option<&str>, @@ -1071,8 +1025,7 @@ mod tests { rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], ..Default::default() }, - output_type: None, - output_dir: None, + sqlite_path: None, } } @@ -1087,7 +1040,7 @@ mod tests { fn test_validate_multi_ts_file_output_ok() -> Result<()> { let mut args = base_args(); args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); - args.output_type = Some(RibOutputType::Sqlite); + args.sqlite_path = Some(PathBuf::from("/tmp/monocle-rib.sqlite3")); let values = args.validate()?; assert_eq!(values.len(), 2); Ok(()) @@ -1122,7 +1075,7 @@ mod tests { } #[test] - fn test_single_snapshot_file_name_includes_filters() -> Result<()> { + fn test_file_name_prefix_includes_filters() -> Result<()> { let mut args = base_args(); args.filters.country = Some("US".to_string()); args.filters.origin_asn = vec!["13335".to_string()]; @@ -1131,8 +1084,10 @@ mod tests { let db = MonocleDatabase::open_in_memory()?; let config = MonocleConfig::default(); let lens = RibLens::new(&db, &config); - let file_name = - lens.single_snapshot_file_name(&args, 1_756_728_000, RibOutputType::Sqlite)?; + let file_name = format!( + "{}.sqlite3", + lens.file_name_prefix(&args, &[1_756_728_000])? + ); assert_eq!( file_name, From b4455ecacbf12de730dbde008110cbe0f252d88b Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Sun, 15 Mar 2026 14:48:54 -0700 Subject: [PATCH 3/5] fix: use positional rib timestamps - move rib timestamps from repeated --ts flags to positional operands - keep --sqlite-path for SQLite output and multi-timestamp runs - update README, changelog, and examples for the revised rib CLI --- CHANGELOG.md | 2 +- README.md | 24 +++++++++++++----------- src/lens/rib/mod.rs | 24 ++++++++++++------------ 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c2bc973..d6a45d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ All notable changes to this project will be documented in this file. * Added `monocle rib` for reconstructing RIB state at arbitrary timestamps * Selects the latest RIB before each requested `rib_ts` and replays updates to the exact timestamp * Supports stdout output by default and SQLite output via `--sqlite-path` - * Repeated `--ts` values require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts` + * Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts` * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` diff --git a/README.md b/README.md index 62eecfd..1bf9ea5 100644 --- a/README.md +++ b/README.md @@ -712,18 +712,20 @@ Reconstruct final RIB state at one or more arbitrary timestamps by loading the l ➜ monocle rib --help Reconstruct final RIB state at one or more arbitrary timestamps -Usage: monocle rib [OPTIONS] --ts +Usage: monocle rib [OPTIONS] ... + +Arguments: + ... Target RIB timestamp operand. Repeat to request multiple snapshots Options: - --ts Target RIB timestamp. Repeat to request multiple snapshots - --debug Print debug information -o, --origin-asn Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude -C, --country Filter by origin ASN registration country + --debug Print debug information --format Output format: table, markdown, json, json-pretty, json-line, psv (default varies by command) - --json Output as JSON objects (shortcut for --format json-pretty) -p, --prefix Filter by network prefix(es), comma-separated. Prefix with ! to exclude - --no-update Disable automatic database updates (use existing cached data only) + --json Output as JSON objects (shortcut for --format json-pretty) -s, --include-super Include super-prefixes when filtering + --no-update Disable automatic database updates (use existing cached data only) -S, --include-sub Include sub-prefixes when filtering -J, --peer-asn Filter by peer ASN(s), comma-separated. Prefix with ! to exclude -a, --as-path Filter by AS path regex string @@ -737,8 +739,8 @@ Options: Behavior: -- A single `--ts` writes to stdout by default. -- Repeated `--ts` values require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts`. +- A single timestamp operand writes to stdout by default. +- Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts`. - Providing `--sqlite-path` writes the reconstructed results to that SQLite file instead of stdout. - If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. - `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. @@ -747,19 +749,19 @@ Examples: ```bash # Print the reconstructed RIB for a single timestamp to stdout -monocle rib --ts 2025-09-01T12:00:00Z -c rrc00 -o 13335 +monocle rib 2025-09-01T12:00:00Z -c rrc00 -o 13335 # Write multiple timestamps to one merged SQLite file in the current directory monocle rib \ - --ts 2025-09-01T12:00:00Z \ - --ts 2025-09-01T18:00:00Z \ + 2025-09-01T12:00:00Z \ + 2025-09-01T18:00:00Z \ --sqlite-path /tmp/rrc00-us.sqlite3 \ -c rrc00 \ --country US \ --full-feed-only # Write a single reconstructed snapshot to SQLite -monocle rib --ts 2025-09-01T12:00:00Z --sqlite-path /tmp/route-views2.sqlite3 -c route-views2 +monocle rib 2025-09-01T12:00:00Z --sqlite-path /tmp/route-views2.sqlite3 -c route-views2 ``` ### `monocle time` diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs index 11f7b83..c58f314 100644 --- a/src/lens/rib/mod.rs +++ b/src/lens/rib/mod.rs @@ -35,11 +35,6 @@ type FullFeedAllowlists = HashMap>; #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[cfg_attr(feature = "cli", derive(Args))] pub struct RibFilters { - /// Target RIB timestamp. Repeat to request multiple snapshots. - #[cfg_attr(feature = "cli", clap(long = "ts", required = true))] - #[serde(default)] - pub rib_ts: Vec, - /// Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude. #[cfg_attr(feature = "cli", clap(short = 'o', long, value_delimiter = ','))] #[serde(default)] @@ -90,6 +85,11 @@ pub struct RibFilters { #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[cfg_attr(feature = "cli", derive(Args))] pub struct RibArgs { + /// Target RIB timestamp operand. Repeat to request multiple snapshots. + #[cfg_attr(feature = "cli", clap(value_name = "RIB_TS", required = true))] + #[serde(default)] + pub rib_ts: Vec, + #[cfg_attr(feature = "cli", clap(flatten))] #[serde(flatten)] pub filters: RibFilters, @@ -104,16 +104,16 @@ impl RibArgs { let time_lens = TimeLens::new(); let mut timestamps = BTreeSet::new(); - for value in &self.filters.rib_ts { + for value in &self.rib_ts { let ts = time_lens .parse_time_string(value) - .map_err(|e| anyhow!("Invalid --ts value '{}': {}", value, e))? + .map_err(|e| anyhow!("Invalid RIB timestamp '{}': {}", value, e))? .timestamp(); timestamps.insert(ts); } if timestamps.is_empty() { - return Err(anyhow!("At least one --ts value is required")); + return Err(anyhow!("At least one RIB timestamp is required")); } Ok(timestamps.into_iter().collect()) @@ -139,7 +139,7 @@ impl RibArgs { } if normalized_ts.len() > 1 && self.sqlite_path.is_none() { - return Err(anyhow!("Multiple --ts values require --sqlite-path.")); + return Err(anyhow!("Multiple RIB timestamps require --sqlite-path.")); } Ok(normalized_ts) @@ -1021,8 +1021,8 @@ mod tests { fn base_args() -> RibArgs { RibArgs { + rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], filters: RibFilters { - rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], ..Default::default() }, sqlite_path: None, @@ -1032,14 +1032,14 @@ mod tests { #[test] fn test_validate_multi_ts_stdout_error() { let mut args = base_args(); - args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + args.rib_ts.push("2025-09-01T13:00:00Z".to_string()); assert!(args.validate().is_err()); } #[test] fn test_validate_multi_ts_file_output_ok() -> Result<()> { let mut args = base_args(); - args.filters.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + args.rib_ts.push("2025-09-01T13:00:00Z".to_string()); args.sqlite_path = Some(PathBuf::from("/tmp/monocle-rib.sqlite3")); let values = args.validate()?; assert_eq!(values.len(), 2); From 1903f884a63efa55c1c396cc7bcaed49d8f7a2f1 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Mon, 16 Mar 2026 07:24:54 -0700 Subject: [PATCH 4/5] perf: keep rib replay state in memory --- CHANGELOG.md | 6 +- README.md | 2 + src/bin/commands/rib.rs | 137 +++++++-- src/database/mod.rs | 5 +- src/database/session/mod.rs | 5 +- src/database/session/rib_store.rs | 456 ++++++++---------------------- src/lens/rib/mod.rs | 78 +---- 7 files changed, 251 insertions(+), 438 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6a45d1..9e685a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,11 @@ All notable changes to this project will be documented in this file. ### Code Improvements -* Added session-backed SQLite stores for reconstructed RIB working state and merged SQLite export +* Added a session-backed SQLite store for merged reconstructed RIB export +* Updated `monocle rib` reconstruction to keep the working RIB state in memory + * Removes SQLite lookups and writes from the replay hot path + * Keeps `path_id` only for internal route identity during add-path reconstruction + * Narrows reconstructed RIB entries and SQLite export rows to collector, timestamp, peer_ip, peer_asn, prefix, as_path, and origin_asns ## v1.2.0 - 2026-02-28 diff --git a/README.md b/README.md index 1bf9ea5..f2ee5be 100644 --- a/README.md +++ b/README.md @@ -740,6 +740,8 @@ Options: Behavior: - A single timestamp operand writes to stdout by default. +- Stdout output is the reconstructed final route set, not an MRT/table-dump export. +- Stdout follows the normal streaming formatter, so the default is `psv` unless `--format` or `--json` is provided. - Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts`. - Providing `--sqlite-path` writes the reconstructed results to that SQLite file instead of stdout. - If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs index b1c2b5b..a7000aa 100644 --- a/src/bin/commands/rib.rs +++ b/src/bin/commands/rib.rs @@ -1,37 +1,28 @@ use std::fs; -use std::io::Write; +use std::io::{BufWriter, Write}; use std::path::Path; use anyhow::{anyhow, Result}; -use bgpkit_parser::BgpElem; - -use monocle::database::{MonocleDatabase, RibSqliteStore}; +use monocle::database::{MonocleDatabase, RibSqliteStore, StoredRibEntry}; use monocle::lens::rib::RibLens; use monocle::utils::{OutputFormat, TimestampFormat}; use monocle::MonocleConfig; +use serde_json::json; +use tabled::builder::Builder; +use tabled::settings::Style; -use super::elem_format::{format_elem, format_elems_table, get_header}; +use super::elem_format::get_header; pub use monocle::lens::rib::RibArgs; const DEFAULT_FIELDS_RIB: &[&str] = &[ - "type", + "collector", "timestamp", "peer_ip", "peer_asn", "prefix", - "path_id", "as_path", "origin_asns", - "origin", - "next_hop", - "local_pref", - "med", - "communities", - "atomic", - "aggr_asn", - "aggr_ip", - "collector", ]; pub fn run(config: &MonocleConfig, args: RibArgs, output_format: OutputFormat, no_update: bool) { @@ -65,22 +56,23 @@ fn run_stdout( output_format: OutputFormat, no_update: bool, ) -> Result<()> { - let mut stdout = std::io::stdout(); + let stdout = std::io::stdout(); + let mut stdout = BufWriter::new(stdout.lock()); if output_format == OutputFormat::Table { - let mut elems = Vec::<(BgpElem, Option)>::new(); + let mut entries = Vec::::new(); lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { state_store.visit_entries(|entry| { - elems.push((entry.elem, Some(entry.collector))); + entries.push(entry.clone()); Ok(()) }) })?; - if !elems.is_empty() { + if !entries.is_empty() { writeln!( stdout, "{}", - format_elems_table(&elems, DEFAULT_FIELDS_RIB, TimestampFormat::Unix) + format_entries_table(&entries, DEFAULT_FIELDS_RIB) ) .map_err(|e| anyhow!("Failed to write table output: {}", e))?; } @@ -98,13 +90,7 @@ fn run_stdout( } state_store.visit_entries(|entry| { - if let Some(line) = format_elem( - &entry.elem, - output_format, - DEFAULT_FIELDS_RIB, - Some(entry.collector.as_str()), - TimestampFormat::Unix, - ) { + if let Some(line) = format_entry(entry, output_format, DEFAULT_FIELDS_RIB) { writeln!(stdout, "{}", line) .map_err(|e| anyhow!("Failed to write reconstructed RIB row: {}", e))?; } @@ -124,10 +110,11 @@ fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Res remove_existing_file(output_path)?; - let sqlite_store = RibSqliteStore::new(path_to_str(output_path)?, true)?; + let mut sqlite_store = RibSqliteStore::new(path_to_str(output_path)?, true)?; let summary = lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store| { - state_store.visit_entries(|entry| sqlite_store.insert_entry(rib_ts, &entry)) + sqlite_store.insert_snapshot(rib_ts, state_store) })?; + sqlite_store.finalize_indexes()?; eprintln!( "wrote {} reconstructed RIB snapshot(s) to {}", @@ -137,6 +124,96 @@ fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Res Ok(()) } +fn format_entries_table(entries: &[StoredRibEntry], fields: &[&str]) -> String { + let mut builder = Builder::default(); + builder.push_record(fields.iter().copied()); + + for entry in entries { + let row = fields + .iter() + .map(|field| entry_field_value(entry, field)) + .collect::>(); + builder.push_record(row); + } + + let mut table = builder.build(); + table.with(Style::rounded()); + table.to_string() +} + +fn format_entry( + entry: &StoredRibEntry, + output_format: OutputFormat, + fields: &[&str], +) -> Option { + match output_format { + OutputFormat::Json | OutputFormat::JsonLine => { + Some(serde_json::to_string(&build_json_object(entry, fields)).unwrap_or_default()) + } + OutputFormat::JsonPretty => Some( + serde_json::to_string_pretty(&build_json_object(entry, fields)).unwrap_or_default(), + ), + OutputFormat::Psv => Some( + fields + .iter() + .map(|field| entry_field_value(entry, field)) + .collect::>() + .join("|"), + ), + OutputFormat::Table => None, + OutputFormat::Markdown => Some(format!( + "| {} |", + fields + .iter() + .map(|field| entry_field_value(entry, field)) + .collect::>() + .join(" | ") + )), + } +} + +fn build_json_object(entry: &StoredRibEntry, fields: &[&str]) -> serde_json::Value { + let mut obj = serde_json::Map::new(); + + for field in fields { + let value = match *field { + "collector" => json!(entry.collector), + "timestamp" => json!(entry.timestamp), + "peer_ip" => json!(entry.peer_ip.to_string()), + "peer_asn" => json!(entry.peer_asn), + "prefix" => json!(entry.prefix), + "as_path" => entry + .as_path + .as_ref() + .map_or(serde_json::Value::Null, |value| json!(value)), + "origin_asns" => entry + .origin_asns + .as_ref() + .map_or(serde_json::Value::Null, |values| { + json!(values.iter().map(u32::to_string).collect::>()) + }), + _ => serde_json::Value::Null, + }; + + obj.insert((*field).to_string(), value); + } + + serde_json::Value::Object(obj) +} + +fn entry_field_value(entry: &StoredRibEntry, field: &str) -> String { + match field { + "collector" => entry.collector.clone(), + "timestamp" => TimestampFormat::Unix.format_timestamp(entry.timestamp), + "peer_ip" => entry.peer_ip.to_string(), + "peer_asn" => entry.peer_asn.to_string(), + "prefix" => entry.prefix.clone(), + "as_path" => entry.as_path.clone().unwrap_or_default(), + "origin_asns" => entry.origin_asns_string().unwrap_or_default(), + _ => String::new(), + } +} + fn remove_existing_file(path: &Path) -> Result<()> { match fs::remove_file(path) { Ok(()) => Ok(()), diff --git a/src/database/mod.rs b/src/database/mod.rs index 8825d19..b8ccf6c 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -126,10 +126,7 @@ pub use monocle::{ #[cfg(feature = "lib")] pub use session::MsgStore; #[cfg(feature = "lib")] -pub use session::{ - elem_matches_stored_json, path_id_for_key, path_id_from_key, RibRouteKey, RibSqliteStore, - RibStateStore, StoredRibEntry, -}; +pub use session::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry}; // ============================================================================= // Helper function diff --git a/src/database/session/mod.rs b/src/database/session/mod.rs index f62c246..0427fae 100644 --- a/src/database/session/mod.rs +++ b/src/database/session/mod.rs @@ -21,7 +21,4 @@ mod rib_store; #[cfg(feature = "lib")] pub use msg_store::MsgStore; #[cfg(feature = "lib")] -pub use rib_store::{ - elem_matches_stored_json, path_id_for_key, path_id_from_key, RibRouteKey, RibSqliteStore, - RibStateStore, StoredRibEntry, -}; +pub use rib_store::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry}; diff --git a/src/database/session/rib_store.rs b/src/database/session/rib_store.rs index ec11255..effb7f3 100644 --- a/src/database/session/rib_store.rs +++ b/src/database/session/rib_store.rs @@ -1,65 +1,18 @@ -//! Session-based SQLite stores for reconstructed RIB snapshots. -//! -//! These stores are separate from `MsgStore` because RIB reconstruction needs: -//! - route-identity keys with `path_id` -//! - exact `BgpElem` round-tripping for reconstructed RIB state -//! - merged SQLite output keyed by `rib_ts` +//! Working-state storage and SQLite export for reconstructed RIB snapshots. + +use std::collections::HashMap; +use std::net::IpAddr; use anyhow::{anyhow, Result}; use bgpkit_parser::BgpElem; -use rusqlite::{params, OptionalExtension}; -use serde_json::Value; -use tempfile::{NamedTempFile, TempPath}; +use rusqlite::params; use crate::database::core::DatabaseConn; -fn opt_to_sql_i64(v: Option) -> i64 { - v.map(i64::from).unwrap_or(-1) -} - -fn sql_i64_to_opt(v: i64) -> Option { - if v < 0 { - None - } else { - u32::try_from(v).ok() - } -} - -fn elem_as_path(elem: &BgpElem) -> Option { - elem.as_path.as_ref().map(|path| path.to_string()) -} - -fn elem_origin_asns(elem: &BgpElem) -> Option { - elem.origin_asns.as_ref().map(|asns| { - asns.iter() - .map(|asn| asn.to_string()) - .collect::>() - .join(" ") - }) -} - -fn elem_next_hop(elem: &BgpElem) -> Option { - elem.next_hop.as_ref().map(|hop| hop.to_string()) -} - -fn elem_communities(elem: &BgpElem) -> Option { - elem.communities.as_ref().map(|communities| { - communities - .iter() - .map(|community| community.to_string()) - .collect::>() - .join(" ") - }) -} - -fn elem_origin(elem: &BgpElem) -> Option { - elem.origin.as_ref().map(|origin| origin.to_string()) -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RibRouteKey { pub collector: String, - pub peer_ip: String, + pub peer_ip: IpAddr, pub peer_asn: u32, pub prefix: String, pub path_id: Option, @@ -69,265 +22,130 @@ impl RibRouteKey { pub fn from_elem(collector: &str, elem: &BgpElem) -> Self { Self { collector: collector.to_string(), - peer_ip: elem.peer_ip.to_string(), + peer_ip: elem.peer_ip, peer_asn: elem.peer_asn.to_u32(), prefix: elem.prefix.prefix.to_string(), path_id: elem.prefix.path_id, } } + + pub fn from_entry(entry: &StoredRibEntry) -> Self { + Self { + collector: entry.collector.clone(), + peer_ip: entry.peer_ip, + peer_asn: entry.peer_asn, + prefix: entry.prefix.clone(), + path_id: entry.path_id, + } + } } #[derive(Debug, Clone)] pub struct StoredRibEntry { pub collector: String, - pub elem: BgpElem, + pub timestamp: f64, + pub peer_ip: IpAddr, + pub peer_asn: u32, + pub prefix: String, + pub path_id: Option, + pub as_path: Option, + pub origin_asns: Option>, } impl StoredRibEntry { - pub fn new(collector: impl Into, elem: BgpElem) -> Self { + pub fn from_elem(collector: &str, elem: BgpElem) -> Self { Self { - collector: collector.into(), - elem, + collector: collector.to_string(), + timestamp: elem.timestamp, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn.to_u32(), + prefix: elem.prefix.prefix.to_string(), + path_id: elem.prefix.path_id, + as_path: elem.as_path.map(|path| path.to_string()), + origin_asns: elem + .origin_asns + .map(|asns| asns.into_iter().map(|asn| asn.to_u32()).collect::>()), } } pub fn route_key(&self) -> RibRouteKey { - RibRouteKey::from_elem(&self.collector, &self.elem) - } - - fn elem_json(&self) -> Result { - serde_json::to_string(&self.elem) - .map_err(|e| anyhow!("Failed to serialize BgpElem for SQLite storage: {}", e)) + RibRouteKey::from_entry(self) } - fn from_row(row: &rusqlite::Row<'_>) -> Result { - let collector: String = row - .get("collector") - .map_err(|e| anyhow!("Failed to read collector column: {}", e))?; - let elem_json: String = row - .get("elem_json") - .map_err(|e| anyhow!("Failed to read elem_json column: {}", e))?; - let elem = serde_json::from_str::(&elem_json) - .map_err(|e| anyhow!("Failed to deserialize stored BgpElem JSON: {}", e))?; - Ok(Self { collector, elem }) + pub fn origin_asns_string(&self) -> Option { + self.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(u32::to_string) + .collect::>() + .join(" ") + }) } } pub struct RibStateStore { - db: DatabaseConn, - _temp_path: Option, + entries: HashMap, } impl RibStateStore { - pub fn new(db_path: Option<&str>, reset: bool) -> Result { - let db = DatabaseConn::open(db_path)?; - let store = Self { - db, - _temp_path: None, - }; - store.initialize(reset)?; - Ok(store) - } - pub fn new_temp() -> Result { - let file = NamedTempFile::new().map_err(|e| { - anyhow!( - "Failed to create temporary SQLite path for rib state: {}", - e - ) - })?; - let temp_path = file.into_temp_path(); - let db = DatabaseConn::open_path( - temp_path - .to_str() - .ok_or_else(|| anyhow!("Temporary rib state path contains invalid UTF-8"))?, - )?; - let store = Self { - db, - _temp_path: Some(temp_path), - }; - store.initialize(true)?; - Ok(store) - } - - fn initialize(&self, reset: bool) -> Result<()> { - if reset { - self.db - .conn - .execute("DROP TABLE IF EXISTS rib_state", []) - .map_err(|e| anyhow!("Failed to drop rib_state table: {}", e))?; - } - - self.db - .conn - .execute_batch( - r#" - CREATE TABLE IF NOT EXISTS rib_state ( - collector TEXT NOT NULL, - peer_ip TEXT NOT NULL, - peer_asn INTEGER NOT NULL, - prefix TEXT NOT NULL, - path_id INTEGER NOT NULL, - timestamp REAL NOT NULL, - as_path TEXT, - origin_asns TEXT, - origin TEXT, - next_hop TEXT, - local_pref INTEGER, - med INTEGER, - communities TEXT, - atomic INTEGER NOT NULL, - aggr_asn INTEGER, - aggr_ip TEXT, - elem_json TEXT NOT NULL, - PRIMARY KEY (collector, peer_ip, peer_asn, prefix, path_id) - ); - CREATE INDEX IF NOT EXISTS idx_rib_state_collector ON rib_state(collector); - CREATE INDEX IF NOT EXISTS idx_rib_state_peer_asn ON rib_state(peer_asn); - CREATE INDEX IF NOT EXISTS idx_rib_state_prefix ON rib_state(prefix); - "#, - ) - .map_err(|e| anyhow!("Failed to initialize rib_state schema: {}", e))?; - Ok(()) + Ok(Self { + entries: HashMap::new(), + }) } pub fn count(&self) -> Result { - self.db.table_count("rib_state") + Ok(self.entries.len() as u64) } pub fn route_exists(&self, key: &RibRouteKey) -> Result { - let exists = self - .db - .conn - .query_row( - "SELECT 1 FROM rib_state WHERE collector = ?1 AND peer_ip = ?2 AND peer_asn = ?3 AND prefix = ?4 AND path_id = ?5", - params![ - key.collector, - key.peer_ip, - key.peer_asn, - key.prefix, - opt_to_sql_i64(key.path_id), - ], - |_| Ok(()), - ) - .optional() - .map_err(|e| anyhow!("Failed to test route existence in rib_state: {}", e))?; - Ok(exists.is_some()) + Ok(self.entries.contains_key(key)) } - pub fn upsert_entry(&self, entry: &StoredRibEntry) -> Result<()> { - self.upsert_entries(std::slice::from_ref(entry)) + pub fn upsert_entry(&mut self, entry: StoredRibEntry) -> Result<()> { + self.upsert_entries(vec![entry]) } - pub fn upsert_entries(&self, entries: &[StoredRibEntry]) -> Result<()> { - if entries.is_empty() { - return Ok(()); - } - - let tx = self - .db - .conn - .unchecked_transaction() - .map_err(|e| anyhow!("Failed to begin rib_state transaction: {}", e))?; - let mut stmt = tx - .prepare_cached( - r#" - INSERT OR REPLACE INTO rib_state ( - collector, peer_ip, peer_asn, prefix, path_id, timestamp, - as_path, origin_asns, origin, next_hop, local_pref, med, - communities, atomic, aggr_asn, aggr_ip, elem_json - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) - "#, - ) - .map_err(|e| anyhow!("Failed to prepare rib_state upsert statement: {}", e))?; - + pub fn upsert_entries(&mut self, entries: I) -> Result<()> + where + I: IntoIterator, + { for entry in entries { - stmt.execute(params![ - entry.collector, - entry.elem.peer_ip.to_string(), - entry.elem.peer_asn.to_u32(), - entry.elem.prefix.prefix.to_string(), - opt_to_sql_i64(entry.elem.prefix.path_id), - entry.elem.timestamp, - elem_as_path(&entry.elem), - elem_origin_asns(&entry.elem), - elem_origin(&entry.elem), - elem_next_hop(&entry.elem), - entry.elem.local_pref, - entry.elem.med, - elem_communities(&entry.elem), - if entry.elem.atomic { 1_i64 } else { 0_i64 }, - entry.elem.aggr_asn.map(|asn| asn.to_u32()), - entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), - entry.elem_json()?, - ]) - .map_err(|e| anyhow!("Failed to upsert entry into rib_state: {}", e))?; + self.entries.insert(entry.route_key(), entry); } - - drop(stmt); - tx.commit() - .map_err(|e| anyhow!("Failed to commit rib_state upserts: {}", e))?; Ok(()) } - pub fn delete_key(&self, key: &RibRouteKey) -> Result<()> { - self.delete_keys(std::slice::from_ref(key)) + pub fn delete_key(&mut self, key: &RibRouteKey) -> Result<()> { + self.entries.remove(key); + Ok(()) } - pub fn delete_keys(&self, keys: &[RibRouteKey]) -> Result<()> { - if keys.is_empty() { - return Ok(()); - } - - let tx = self - .db - .conn - .unchecked_transaction() - .map_err(|e| anyhow!("Failed to begin rib_state delete transaction: {}", e))?; - let mut stmt = tx - .prepare_cached( - "DELETE FROM rib_state WHERE collector = ?1 AND peer_ip = ?2 AND peer_asn = ?3 AND prefix = ?4 AND path_id = ?5", - ) - .map_err(|e| anyhow!("Failed to prepare rib_state delete statement: {}", e))?; - + pub fn delete_keys(&mut self, keys: I) -> Result<()> + where + I: IntoIterator, + { for key in keys { - stmt.execute(params![ - key.collector, - key.peer_ip, - key.peer_asn, - key.prefix, - opt_to_sql_i64(key.path_id), - ]) - .map_err(|e| anyhow!("Failed to delete entry from rib_state: {}", e))?; + self.entries.remove(&key); } - - drop(stmt); - tx.commit() - .map_err(|e| anyhow!("Failed to commit rib_state deletes: {}", e))?; Ok(()) } pub fn visit_entries(&self, mut visitor: F) -> Result<()> where - F: FnMut(StoredRibEntry) -> Result<()>, + F: FnMut(&StoredRibEntry) -> Result<()>, { - let mut stmt = self - .db - .conn - .prepare( - "SELECT collector, elem_json FROM rib_state ORDER BY collector, peer_asn, peer_ip, prefix, path_id", - ) - .map_err(|e| anyhow!("Failed to prepare rib_state scan statement: {}", e))?; - - let mut rows = stmt - .query([]) - .map_err(|e| anyhow!("Failed to query rib_state rows: {}", e))?; + let mut entries = self.entries.values().collect::>(); + entries.sort_by(|a, b| { + a.collector + .cmp(&b.collector) + .then(a.peer_asn.cmp(&b.peer_asn)) + .then(a.peer_ip.to_string().cmp(&b.peer_ip.to_string())) + .then(a.prefix.cmp(&b.prefix)) + .then(a.path_id.cmp(&b.path_id)) + }); - while let Some(row) = rows - .next() - .map_err(|e| anyhow!("Failed to iterate rib_state rows: {}", e))? - { - visitor(StoredRibEntry::from_row(row)?)?; + for entry in entries { + visitor(entry)?; } Ok(()) @@ -365,76 +183,66 @@ impl RibSqliteStore { peer_ip TEXT NOT NULL, peer_asn INTEGER NOT NULL, prefix TEXT NOT NULL, - path_id INTEGER NOT NULL, as_path TEXT, - origin_asns TEXT, - origin TEXT, - next_hop TEXT, - local_pref INTEGER, - med INTEGER, - communities TEXT, - atomic INTEGER NOT NULL, - aggr_asn INTEGER, - aggr_ip TEXT + origin_asns TEXT ); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts ON elems(rib_ts); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_prefix ON elems(rib_ts, prefix); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_peer_asn ON elems(rib_ts, peer_asn); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_collector ON elems(rib_ts, collector); "#, ) .map_err(|e| anyhow!("Failed to initialize rib output SQLite schema: {}", e))?; Ok(()) } - pub fn insert_entry(&self, rib_ts: i64, entry: &StoredRibEntry) -> Result<()> { - self.db + pub fn insert_snapshot(&mut self, rib_ts: i64, state_store: &RibStateStore) -> Result<()> { + let tx = self + .db .conn - .execute( + .unchecked_transaction() + .map_err(|e| anyhow!("Failed to begin rib output transaction: {}", e))?; + let mut stmt = tx + .prepare_cached( r#" INSERT INTO elems ( - rib_ts, timestamp, collector, peer_ip, peer_asn, prefix, path_id, - as_path, origin_asns, origin, next_hop, local_pref, med, - communities, atomic, aggr_asn, aggr_ip - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17) + rib_ts, timestamp, collector, peer_ip, peer_asn, prefix, as_path, origin_asns + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) "#, - params![ - rib_ts, - entry.elem.timestamp, - entry.collector, - entry.elem.peer_ip.to_string(), - entry.elem.peer_asn.to_u32(), - entry.elem.prefix.prefix.to_string(), - opt_to_sql_i64(entry.elem.prefix.path_id), - elem_as_path(&entry.elem), - elem_origin_asns(&entry.elem), - elem_origin(&entry.elem), - elem_next_hop(&entry.elem), - entry.elem.local_pref, - entry.elem.med, - elem_communities(&entry.elem), - if entry.elem.atomic { 1_i64 } else { 0_i64 }, - entry.elem.aggr_asn.map(|asn| asn.to_u32()), - entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), - ], ) + .map_err(|e| anyhow!("Failed to prepare rib output insert statement: {}", e))?; + + state_store.visit_entries(|entry| { + stmt.execute(params![ + rib_ts, + entry.timestamp, + entry.collector, + entry.peer_ip.to_string(), + entry.peer_asn, + entry.prefix, + entry.as_path, + entry.origin_asns_string(), + ]) .map_err(|e| anyhow!("Failed to insert entry into rib output SQLite store: {}", e))?; + Ok(()) + })?; + + drop(stmt); + tx.commit() + .map_err(|e| anyhow!("Failed to commit rib output inserts: {}", e))?; Ok(()) } -} -pub fn elem_matches_stored_json(elem_json: &str, key: &str) -> Result> { - let value = serde_json::from_str::(elem_json) - .map_err(|e| anyhow!("Failed to deserialize stored elem_json value: {}", e))?; - Ok(value.get(key).cloned()) -} - -pub fn path_id_for_key(path_id: Option) -> i64 { - opt_to_sql_i64(path_id) -} - -pub fn path_id_from_key(path_id: i64) -> Option { - sql_i64_to_opt(path_id) + pub fn finalize_indexes(&self) -> Result<()> { + self.db + .conn + .execute_batch( + r#" + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts ON elems(rib_ts); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_prefix ON elems(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_peer_asn ON elems(rib_ts, peer_asn); + CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_collector ON elems(rib_ts, collector); + "#, + ) + .map_err(|e| anyhow!("Failed to create rib output SQLite indexes: {}", e))?; + Ok(()) + } } #[cfg(test)] @@ -470,36 +278,20 @@ mod tests { #[test] fn test_rib_state_store_round_trip() -> Result<()> { - let store = RibStateStore::new_temp()?; - let entry = StoredRibEntry::new("rrc00", test_elem()?); - store.upsert_entry(&entry)?; + let mut store = RibStateStore::new_temp()?; + let entry = StoredRibEntry::from_elem("rrc00", test_elem()?); + store.upsert_entry(entry.clone())?; assert!(store.route_exists(&entry.route_key())?); let mut visited = Vec::new(); store.visit_entries(|entry| { - visited.push(entry); + visited.push(entry.clone()); Ok(()) })?; assert_eq!(visited.len(), 1); assert_eq!(visited[0].collector, "rrc00"); - assert_eq!(visited[0].elem.prefix.path_id, Some(7)); - Ok(()) - } - - #[test] - fn test_path_id_helpers() { - assert_eq!(path_id_for_key(None), -1); - assert_eq!(path_id_from_key(-1), None); - assert_eq!(path_id_from_key(42), Some(42)); - } - - #[test] - fn test_elem_json_access() -> Result<()> { - let elem = test_elem()?; - let entry = StoredRibEntry::new("rrc00", elem); - let origin_asns = elem_matches_stored_json(&entry.elem_json()?, "origin_asns")?; - assert!(origin_asns.is_some()); + assert_eq!(visited[0].path_id, Some(7)); Ok(()) } } diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs index c58f314..e6254cf 100644 --- a/src/lens/rib/mod.rs +++ b/src/lens/rib/mod.rs @@ -146,62 +146,6 @@ impl RibArgs { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RibRow { - pub rib_ts: i64, - pub timestamp: f64, - pub collector: String, - pub peer_ip: String, - pub peer_asn: u32, - pub prefix: String, - pub path_id: Option, - pub as_path: Option, - pub origin_asns: Option, - pub origin: Option, - pub next_hop: Option, - pub local_pref: Option, - pub med: Option, - pub communities: Option, - pub atomic: bool, - pub aggr_asn: Option, - pub aggr_ip: Option, -} - -impl RibRow { - pub fn from_entry(rib_ts: i64, entry: &StoredRibEntry) -> Self { - Self { - rib_ts, - timestamp: entry.elem.timestamp, - collector: entry.collector.clone(), - peer_ip: entry.elem.peer_ip.to_string(), - peer_asn: entry.elem.peer_asn.to_u32(), - prefix: entry.elem.prefix.prefix.to_string(), - path_id: entry.elem.prefix.path_id, - as_path: entry.elem.as_path.as_ref().map(|path| path.to_string()), - origin_asns: entry.elem.origin_asns.as_ref().map(|asns| { - asns.iter() - .map(|asn| asn.to_string()) - .collect::>() - .join(" ") - }), - origin: entry.elem.origin.as_ref().map(|origin| origin.to_string()), - next_hop: entry.elem.next_hop.as_ref().map(|hop| hop.to_string()), - local_pref: entry.elem.local_pref, - med: entry.elem.med, - communities: entry.elem.communities.as_ref().map(|communities| { - communities - .iter() - .map(|community| community.to_string()) - .collect::>() - .join(" ") - }), - atomic: entry.elem.atomic, - aggr_asn: entry.elem.aggr_asn.map(|asn| asn.to_u32()), - aggr_ip: entry.elem.aggr_ip.as_ref().map(|ip| ip.to_string()), - } - } -} - #[derive(Debug, Clone)] pub struct RibRunSummary { pub rib_ts: Vec, @@ -261,7 +205,7 @@ impl<'a> RibLens<'a> { }; for group in &groups { - let state_store = RibStateStore::new_temp()?; + let mut state_store = RibStateStore::new_temp()?; let safe_base_filters = self.safe_parse_filters( args, group.rib_item.ts_start.and_utc().timestamp(), @@ -269,7 +213,7 @@ impl<'a> RibLens<'a> { ); self.load_base_rib( - &state_store, + &mut state_store, &group.collector, &group.rib_item, &safe_base_filters, @@ -280,7 +224,7 @@ impl<'a> RibLens<'a> { )?; self.replay_updates( - &state_store, + &mut state_store, group, args, country_asns.as_ref(), @@ -625,7 +569,7 @@ impl<'a> RibLens<'a> { #[allow(clippy::too_many_arguments)] fn load_base_rib( &self, - state_store: &RibStateStore, + state_store: &mut RibStateStore, collector: &str, rib_item: &BrokerItem, safe_filters: &ParseFilters, @@ -655,18 +599,18 @@ impl<'a> RibLens<'a> { as_path_regex, full_feed_allowlist, ) { - batch.push(StoredRibEntry::new(collector.to_string(), elem)); + batch.push(StoredRibEntry::from_elem(collector, elem)); } } - state_store.upsert_entries(&batch)?; + state_store.upsert_entries(batch)?; Ok(()) } #[allow(clippy::too_many_arguments)] fn replay_updates( &self, - state_store: &RibStateStore, + state_store: &mut RibStateStore, group: &RibReplayGroup, args: &RibArgs, country_asns: Option<&HashSet>, @@ -762,7 +706,7 @@ impl<'a> RibLens<'a> { if matches { pending.insert( route_key, - DeltaOp::Upsert(StoredRibEntry::new(collector.to_string(), elem)), + DeltaOp::Upsert(StoredRibEntry::from_elem(collector, elem)), ); } else if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); @@ -787,7 +731,7 @@ impl<'a> RibLens<'a> { fn flush_pending( &self, - state_store: &RibStateStore, + state_store: &mut RibStateStore, pending: &mut HashMap, ) -> Result<()> { if pending.is_empty() { @@ -805,10 +749,10 @@ impl<'a> RibLens<'a> { } if !upserts.is_empty() { - state_store.upsert_entries(&upserts)?; + state_store.upsert_entries(upserts)?; } if !deletes.is_empty() { - state_store.delete_keys(&deletes)?; + state_store.delete_keys(deletes)?; } pending.clear(); From bc922d100bfab4027d8e35e7db5d796a0a6b8701 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Tue, 24 Mar 2026 14:01:27 -0700 Subject: [PATCH 5/5] feat: redesign RIB SQLite output with ribs and updates tables BREAKING CHANGE: SQLite output schema changed from single 'elems' table to two tables: - 'ribs': stores final reconstructed RIB states at each target timestamp - 'updates': stores filtered BGP updates for 2nd and later RIBs only Performance improvements: - Use Arc for collector and prefix fields to reduce allocations - Remove unnecessary per-snapshot O(n log n) sorting - Reduce updates query window from +2h lookahead to exact target Changes: - Added StoredRibUpdate struct for tracking filtered updates - Updated RibSqliteStore with two-table schema - Modified reconstruct_snapshots callback to include filtered updates - Added comprehensive documentation for new schema --- CHANGELOG.md | 15 ++ README.md | 44 ++++- src/bin/commands/rib.rs | 65 +++--- src/database/mod.rs | 2 +- src/database/session/mod.rs | 2 +- src/database/session/rib_store.rs | 319 ++++++++++++++++++++++++------ src/lens/rib/mod.rs | 145 ++++++++++---- 7 files changed, 464 insertions(+), 128 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e685a5..43649c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. ## Unreleased changes +### Breaking Changes + +* Changed `monocle rib --sqlite-path` output schema from single `elems` table to two tables: + * `ribs` table: stores final reconstructed RIB states at each target timestamp + * `updates` table: stores filtered BGP updates used to build 2nd and later RIB snapshots + * Updates table is only populated for RIBs after the first/base RIB + ### New Features * Added `monocle rib` for reconstructing RIB state at arbitrary timestamps @@ -13,8 +20,16 @@ All notable changes to this project will be documented in this file. * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` +### Performance Improvements + +* Reduced string allocations in RIB reconstruction by using `Arc` for collector and prefix fields +* Removed unnecessary per-snapshot sorting that was `O(n log n)` on all entries +* Reduced updates query window from +2 hours lookahead to exact target timestamp + * Results in 33% fewer update files downloaded for typical requests + ### Code Improvements +* Added `StoredRibUpdate` struct to track filtered BGP updates during reconstruction * Added a session-backed SQLite store for merged reconstructed RIB export * Updated `monocle rib` reconstruction to keep the working RIB state in memory * Removes SQLite lookups and writes from the replay hot path diff --git a/README.md b/README.md index f2ee5be..7360397 100644 --- a/README.md +++ b/README.md @@ -742,11 +742,53 @@ Behavior: - A single timestamp operand writes to stdout by default. - Stdout output is the reconstructed final route set, not an MRT/table-dump export. - Stdout follows the normal streaming formatter, so the default is `psv` unless `--format` or `--json` is provided. -- Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts`. +- Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file. - Providing `--sqlite-path` writes the reconstructed results to that SQLite file instead of stdout. - If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. - `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. +SQLite Output Schema: + +When using `--sqlite-path`, the output contains two tables: + +**`ribs` table** - Final reconstructed RIB states: +```sql +CREATE TABLE ribs ( + rib_ts INTEGER NOT NULL, -- Target RIB timestamp (the time you requested) + timestamp REAL NOT NULL, -- Actual BGP message timestamp + collector TEXT NOT NULL, -- Route collector name (e.g., 'rrc00') + peer_ip TEXT NOT NULL, -- Peer IP address + peer_asn INTEGER NOT NULL, -- Peer AS number + prefix TEXT NOT NULL, -- Network prefix + path_id INTEGER, -- BGP path identifier (for add-path) + as_path TEXT, -- AS path string + origin_asns TEXT -- Origin AS numbers (space-separated) +); +``` +- Contains one row per (rib_ts, route) showing the final routing table state at each requested timestamp. +- Query example: `SELECT * FROM ribs WHERE rib_ts = 1704067200 AND prefix = '1.1.1.0/24';` + +**`updates` table** - Filtered BGP updates (2nd and later RIBs only): +```sql +CREATE TABLE updates ( + rib_ts INTEGER NOT NULL, -- Target RIB timestamp this update contributed to + timestamp REAL NOT NULL, -- When the update message was received + collector TEXT NOT NULL, -- Route collector name + peer_ip TEXT NOT NULL, -- Peer IP address + peer_asn INTEGER NOT NULL, -- Peer AS number + prefix TEXT NOT NULL, -- Network prefix + path_id INTEGER, -- BGP path identifier + as_path TEXT, -- AS path string + origin_asns TEXT, -- Origin AS numbers + elem_type TEXT NOT NULL -- 'ANNOUNCE' or 'WITHDRAW' +); +``` +- Contains filtered updates that were applied to build 2nd and later RIB snapshots. +- **Not populated for the first/base RIB** (loaded directly from RIB dump file). +- Shows the incremental changes between consecutive RIB states. +- Useful for understanding what changed between snapshots. +- Query example: `SELECT * FROM updates WHERE rib_ts = 1704090000 ORDER BY timestamp;` + Examples: ```bash diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs index a7000aa..49b0beb 100644 --- a/src/bin/commands/rib.rs +++ b/src/bin/commands/rib.rs @@ -61,12 +61,16 @@ fn run_stdout( if output_format == OutputFormat::Table { let mut entries = Vec::::new(); - lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { - state_store.visit_entries(|entry| { - entries.push(entry.clone()); - Ok(()) - }) - })?; + lens.reconstruct_snapshots( + args, + no_update, + |_rib_ts, state_store, _filtered_updates| { + state_store.visit_entries(|entry| { + entries.push(entry.clone()); + Ok(()) + }) + }, + )?; if !entries.is_empty() { writeln!( @@ -80,23 +84,27 @@ fn run_stdout( } let mut header_written = false; - lens.reconstruct_snapshots(args, no_update, |_rib_ts, state_store| { - if !header_written { - if let Some(header) = get_header(output_format, DEFAULT_FIELDS_RIB) { - writeln!(stdout, "{}", header) - .map_err(|e| anyhow!("Failed to write output header: {}", e))?; + lens.reconstruct_snapshots( + args, + no_update, + |_rib_ts, state_store, _filtered_updates| { + if !header_written { + if let Some(header) = get_header(output_format, DEFAULT_FIELDS_RIB) { + writeln!(stdout, "{}", header) + .map_err(|e| anyhow!("Failed to write output header: {}", e))?; + } + header_written = true; } - header_written = true; - } - state_store.visit_entries(|entry| { - if let Some(line) = format_entry(entry, output_format, DEFAULT_FIELDS_RIB) { - writeln!(stdout, "{}", line) - .map_err(|e| anyhow!("Failed to write reconstructed RIB row: {}", e))?; - } - Ok(()) - }) - })?; + state_store.visit_entries(|entry| { + if let Some(line) = format_entry(entry, output_format, DEFAULT_FIELDS_RIB) { + writeln!(stdout, "{}", line) + .map_err(|e| anyhow!("Failed to write reconstructed RIB row: {}", e))?; + } + Ok(()) + }) + }, + )?; Ok(()) } @@ -111,9 +119,10 @@ fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Res remove_existing_file(output_path)?; let mut sqlite_store = RibSqliteStore::new(path_to_str(output_path)?, true)?; - let summary = lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store| { - sqlite_store.insert_snapshot(rib_ts, state_store) - })?; + let summary = + lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store, filtered_updates| { + sqlite_store.insert_snapshot(rib_ts, state_store, filtered_updates) + })?; sqlite_store.finalize_indexes()?; eprintln!( @@ -177,11 +186,11 @@ fn build_json_object(entry: &StoredRibEntry, fields: &[&str]) -> serde_json::Val for field in fields { let value = match *field { - "collector" => json!(entry.collector), + "collector" => json!(entry.collector.to_string()), "timestamp" => json!(entry.timestamp), "peer_ip" => json!(entry.peer_ip.to_string()), "peer_asn" => json!(entry.peer_asn), - "prefix" => json!(entry.prefix), + "prefix" => json!(entry.prefix.to_string()), "as_path" => entry .as_path .as_ref() @@ -203,11 +212,11 @@ fn build_json_object(entry: &StoredRibEntry, fields: &[&str]) -> serde_json::Val fn entry_field_value(entry: &StoredRibEntry, field: &str) -> String { match field { - "collector" => entry.collector.clone(), + "collector" => entry.collector.to_string(), "timestamp" => TimestampFormat::Unix.format_timestamp(entry.timestamp), "peer_ip" => entry.peer_ip.to_string(), "peer_asn" => entry.peer_asn.to_string(), - "prefix" => entry.prefix.clone(), + "prefix" => entry.prefix.to_string(), "as_path" => entry.as_path.clone().unwrap_or_default(), "origin_asns" => entry.origin_asns_string().unwrap_or_default(), _ => String::new(), diff --git a/src/database/mod.rs b/src/database/mod.rs index b8ccf6c..06b70ac 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -126,7 +126,7 @@ pub use monocle::{ #[cfg(feature = "lib")] pub use session::MsgStore; #[cfg(feature = "lib")] -pub use session::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry}; +pub use session::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry, StoredRibUpdate}; // ============================================================================= // Helper function diff --git a/src/database/session/mod.rs b/src/database/session/mod.rs index 0427fae..11881fb 100644 --- a/src/database/session/mod.rs +++ b/src/database/session/mod.rs @@ -21,4 +21,4 @@ mod rib_store; #[cfg(feature = "lib")] pub use msg_store::MsgStore; #[cfg(feature = "lib")] -pub use rib_store::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry}; +pub use rib_store::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry, StoredRibUpdate}; diff --git a/src/database/session/rib_store.rs b/src/database/session/rib_store.rs index effb7f3..364cb46 100644 --- a/src/database/session/rib_store.rs +++ b/src/database/session/rib_store.rs @@ -2,8 +2,10 @@ use std::collections::HashMap; use std::net::IpAddr; +use std::sync::Arc; use anyhow::{anyhow, Result}; +use bgpkit_parser::models::ElemType; use bgpkit_parser::BgpElem; use rusqlite::params; @@ -11,30 +13,30 @@ use crate::database::core::DatabaseConn; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RibRouteKey { - pub collector: String, + pub collector: Arc, pub peer_ip: IpAddr, pub peer_asn: u32, - pub prefix: String, + pub prefix: Arc, pub path_id: Option, } impl RibRouteKey { - pub fn from_elem(collector: &str, elem: &BgpElem) -> Self { + pub fn from_elem(collector: Arc, elem: &BgpElem) -> Self { Self { - collector: collector.to_string(), + collector, peer_ip: elem.peer_ip, peer_asn: elem.peer_asn.to_u32(), - prefix: elem.prefix.prefix.to_string(), + prefix: Arc::from(elem.prefix.prefix.to_string().into_boxed_str()), path_id: elem.prefix.path_id, } } pub fn from_entry(entry: &StoredRibEntry) -> Self { Self { - collector: entry.collector.clone(), + collector: Arc::clone(&entry.collector), peer_ip: entry.peer_ip, peer_asn: entry.peer_asn, - prefix: entry.prefix.clone(), + prefix: Arc::clone(&entry.prefix), path_id: entry.path_id, } } @@ -42,24 +44,24 @@ impl RibRouteKey { #[derive(Debug, Clone)] pub struct StoredRibEntry { - pub collector: String, + pub collector: Arc, pub timestamp: f64, pub peer_ip: IpAddr, pub peer_asn: u32, - pub prefix: String, + pub prefix: Arc, pub path_id: Option, pub as_path: Option, pub origin_asns: Option>, } impl StoredRibEntry { - pub fn from_elem(collector: &str, elem: BgpElem) -> Self { + pub fn from_elem(collector: Arc, elem: BgpElem) -> Self { Self { - collector: collector.to_string(), + collector, timestamp: elem.timestamp, peer_ip: elem.peer_ip, peer_asn: elem.peer_asn.to_u32(), - prefix: elem.prefix.prefix.to_string(), + prefix: Arc::from(elem.prefix.prefix.to_string().into_boxed_str()), path_id: elem.prefix.path_id, as_path: elem.as_path.map(|path| path.to_string()), origin_asns: elem @@ -82,6 +84,53 @@ impl StoredRibEntry { } } +/// Represents a filtered BGP update message that contributed to RIB reconstruction. +/// Stored in the `updates` table for 2nd and later RIB snapshots. +#[derive(Debug, Clone)] +pub struct StoredRibUpdate { + /// The target RIB timestamp this update contributed to + pub rib_ts: i64, + /// When the update message was received + pub timestamp: f64, + pub collector: Arc, + pub peer_ip: IpAddr, + pub peer_asn: u32, + pub prefix: Arc, + pub path_id: Option, + pub as_path: Option, + pub origin_asns: Option>, + /// The type of BGP message (ANNOUNCE or WITHDRAW) + pub elem_type: ElemType, +} + +impl StoredRibUpdate { + pub fn from_elem(rib_ts: i64, collector: Arc, elem: BgpElem, elem_type: ElemType) -> Self { + Self { + rib_ts, + collector, + timestamp: elem.timestamp, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn.to_u32(), + prefix: Arc::from(elem.prefix.prefix.to_string().into_boxed_str()), + path_id: elem.prefix.path_id, + as_path: elem.as_path.map(|path| path.to_string()), + origin_asns: elem + .origin_asns + .map(|asns| asns.into_iter().map(|asn| asn.to_u32()).collect::>()), + elem_type, + } + } + + pub fn origin_asns_string(&self) -> Option { + self.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(u32::to_string) + .collect::>() + .join(" ") + }) + } +} + pub struct RibStateStore { entries: HashMap, } @@ -134,32 +183,34 @@ impl RibStateStore { where F: FnMut(&StoredRibEntry) -> Result<()>, { - let mut entries = self.entries.values().collect::>(); - entries.sort_by(|a, b| { - a.collector - .cmp(&b.collector) - .then(a.peer_asn.cmp(&b.peer_asn)) - .then(a.peer_ip.to_string().cmp(&b.peer_ip.to_string())) - .then(a.prefix.cmp(&b.prefix)) - .then(a.path_id.cmp(&b.path_id)) - }); - - for entry in entries { + for entry in self.entries.values() { visitor(entry)?; } - Ok(()) } } +/// SQLite storage for RIB reconstruction output with two tables: +/// +/// - `ribs`: Stores final reconstructed RIB states at each target timestamp. +/// Contains one row per (rib_ts, route) showing the routing table state. +/// +/// - `updates`: Stores filtered BGP update messages that were applied to build +/// subsequent RIB snapshots. Only populated for 2nd and later RIBs. +/// Shows the incremental changes between snapshots. pub struct RibSqliteStore { db: DatabaseConn, + /// Tracks which snapshot index we're processing (0 = first RIB) + snapshot_index: usize, } impl RibSqliteStore { pub fn new(db_path: &str, reset: bool) -> Result { let db = DatabaseConn::open_path(db_path)?; - let store = Self { db }; + let store = Self { + db, + snapshot_index: 0, + }; store.initialize(reset)?; Ok(store) } @@ -168,64 +219,141 @@ impl RibSqliteStore { if reset { self.db .conn - .execute("DROP TABLE IF EXISTS elems", []) - .map_err(|e| anyhow!("Failed to drop existing rib output elems table: {}", e))?; + .execute("DROP TABLE IF EXISTS ribs", []) + .map_err(|e| anyhow!("Failed to drop existing ribs table: {}", e))?; + self.db + .conn + .execute("DROP TABLE IF EXISTS updates", []) + .map_err(|e| anyhow!("Failed to drop existing updates table: {}", e))?; } self.db .conn .execute_batch( r#" - CREATE TABLE IF NOT EXISTS elems ( + -- Final reconstructed RIB states at each target timestamp + -- One row per (rib_ts, route_key) showing the routing table state + CREATE TABLE IF NOT EXISTS ribs ( rib_ts INTEGER NOT NULL, timestamp REAL NOT NULL, collector TEXT NOT NULL, peer_ip TEXT NOT NULL, peer_asn INTEGER NOT NULL, prefix TEXT NOT NULL, + path_id INTEGER, as_path TEXT, origin_asns TEXT ); + + -- Filtered BGP updates used to build 2nd and later RIB snapshots + -- Only populated when multiple rib_ts are requested + -- Shows incremental changes between consecutive RIB states + CREATE TABLE IF NOT EXISTS updates ( + rib_ts INTEGER NOT NULL, + timestamp REAL NOT NULL, + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER, + as_path TEXT, + origin_asns TEXT, + elem_type TEXT NOT NULL + ); "#, ) - .map_err(|e| anyhow!("Failed to initialize rib output SQLite schema: {}", e))?; + .map_err(|e| anyhow!("Failed to initialize RIB SQLite schema: {}", e))?; Ok(()) } - pub fn insert_snapshot(&mut self, rib_ts: i64, state_store: &RibStateStore) -> Result<()> { + /// Insert a RIB snapshot and its associated filtered updates. + /// + /// # Arguments + /// * `rib_ts` - The target RIB timestamp + /// * `state_store` - The final RIB state to store + /// * `filtered_updates` - Updates that contributed to this RIB (only stored for 2nd+ RIBs) + pub fn insert_snapshot( + &mut self, + rib_ts: i64, + state_store: &RibStateStore, + filtered_updates: &[StoredRibUpdate], + ) -> Result<()> { let tx = self .db .conn .unchecked_transaction() - .map_err(|e| anyhow!("Failed to begin rib output transaction: {}", e))?; - let mut stmt = tx - .prepare_cached( - r#" - INSERT INTO elems ( - rib_ts, timestamp, collector, peer_ip, peer_asn, prefix, as_path, origin_asns - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) - "#, - ) - .map_err(|e| anyhow!("Failed to prepare rib output insert statement: {}", e))?; - - state_store.visit_entries(|entry| { - stmt.execute(params![ - rib_ts, - entry.timestamp, - entry.collector, - entry.peer_ip.to_string(), - entry.peer_asn, - entry.prefix, - entry.as_path, - entry.origin_asns_string(), - ]) - .map_err(|e| anyhow!("Failed to insert entry into rib output SQLite store: {}", e))?; - Ok(()) - })?; + .map_err(|e| anyhow!("Failed to begin RIB output transaction: {}", e))?; + + // Insert RIB entries into 'ribs' table (always populated) + { + let mut rib_stmt = tx + .prepare_cached( + r#" + INSERT INTO ribs ( + rib_ts, timestamp, collector, peer_ip, peer_asn, + prefix, path_id, as_path, origin_asns + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) + "#, + ) + .map_err(|e| anyhow!("Failed to prepare ribs insert statement: {}", e))?; + + state_store.visit_entries(|entry| { + rib_stmt + .execute(params![ + rib_ts, + entry.timestamp, + entry.collector, + entry.peer_ip.to_string(), + entry.peer_asn, + entry.prefix, + entry.path_id, + entry.as_path, + entry.origin_asns_string(), + ]) + .map_err(|e| anyhow!("Failed to insert into ribs table: {}", e))?; + Ok(()) + })?; + } + + // Insert filtered updates into 'updates' table (only for 2nd and later RIBs) + if self.snapshot_index > 0 && !filtered_updates.is_empty() { + let mut update_stmt = tx + .prepare_cached( + r#" + INSERT INTO updates ( + rib_ts, timestamp, collector, peer_ip, peer_asn, + prefix, path_id, as_path, origin_asns, elem_type + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) + "#, + ) + .map_err(|e| anyhow!("Failed to prepare updates insert statement: {}", e))?; + + for update in filtered_updates { + let elem_type_str = match update.elem_type { + ElemType::ANNOUNCE => "ANNOUNCE", + ElemType::WITHDRAW => "WITHDRAW", + }; + + update_stmt + .execute(params![ + update.rib_ts, + update.timestamp, + update.collector, + update.peer_ip.to_string(), + update.peer_asn, + update.prefix, + update.path_id, + update.as_path, + update.origin_asns_string(), + elem_type_str, + ]) + .map_err(|e| anyhow!("Failed to insert into updates table: {}", e))?; + } + } - drop(stmt); tx.commit() - .map_err(|e| anyhow!("Failed to commit rib output inserts: {}", e))?; + .map_err(|e| anyhow!("Failed to commit RIB output transaction: {}", e))?; + self.snapshot_index += 1; Ok(()) } @@ -234,13 +362,20 @@ impl RibSqliteStore { .conn .execute_batch( r#" - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts ON elems(rib_ts); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_prefix ON elems(rib_ts, prefix); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_peer_asn ON elems(rib_ts, peer_asn); - CREATE INDEX IF NOT EXISTS idx_rib_output_rib_ts_collector ON elems(rib_ts, collector); + -- Indexes for ribs table (final RIB states) + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts ON ribs(rib_ts); + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts_prefix ON ribs(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts_peer_asn ON ribs(rib_ts, peer_asn); + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts_collector ON ribs(rib_ts, collector); + + -- Indexes for updates table (intermediate changes) + CREATE INDEX IF NOT EXISTS idx_updates_rib_ts ON updates(rib_ts); + CREATE INDEX IF NOT EXISTS idx_updates_rib_ts_prefix ON updates(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_updates_timestamp ON updates(timestamp); + CREATE INDEX IF NOT EXISTS idx_updates_collector ON updates(collector); "#, ) - .map_err(|e| anyhow!("Failed to create rib output SQLite indexes: {}", e))?; + .map_err(|e| anyhow!("Failed to create RIB SQLite indexes: {}", e))?; Ok(()) } } @@ -250,6 +385,7 @@ mod tests { use super::*; use bgpkit_parser::models::{AsPath, AsPathSegment, ElemType, NetworkPrefix}; use std::net::{IpAddr, Ipv4Addr}; + use std::sync::Arc; fn test_elem() -> Result { Ok(BgpElem { @@ -279,7 +415,7 @@ mod tests { #[test] fn test_rib_state_store_round_trip() -> Result<()> { let mut store = RibStateStore::new_temp()?; - let entry = StoredRibEntry::from_elem("rrc00", test_elem()?); + let entry = StoredRibEntry::from_elem(Arc::from("rrc00"), test_elem()?); store.upsert_entry(entry.clone())?; assert!(store.route_exists(&entry.route_key())?); @@ -290,8 +426,63 @@ mod tests { })?; assert_eq!(visited.len(), 1); - assert_eq!(visited[0].collector, "rrc00"); + assert_eq!(visited[0].collector.as_ref(), "rrc00"); assert_eq!(visited[0].path_id, Some(7)); Ok(()) } + + #[test] + fn test_sqlite_store_two_tables() -> Result<()> { + use tempfile::NamedTempFile; + + let temp_file = NamedTempFile::new()?; + let path = temp_file.path().to_str().unwrap(); + + let mut store = RibSqliteStore::new(path, true)?; + + // Create first RIB snapshot (no updates should be stored) + let mut state1 = RibStateStore::new_temp()?; + let entry1 = StoredRibEntry::from_elem(Arc::from("rrc00"), test_elem()?); + state1.upsert_entry(entry1)?; + + // First RIB: no updates stored + store.insert_snapshot(1704067200, &state1, &[])?; + + // Create second RIB snapshot with updates + let mut state2 = RibStateStore::new_temp()?; + let entry2 = StoredRibEntry::from_elem(Arc::from("rrc00"), test_elem()?); + state2.upsert_entry(entry2)?; + + let update = StoredRibUpdate::from_elem( + 1704069000, + Arc::from("rrc00"), + test_elem()?, + ElemType::ANNOUNCE, + ); + + // Second RIB: updates should be stored + store.insert_snapshot(1704069000, &state2, &[update])?; + + store.finalize_indexes()?; + + // Verify tables exist and have correct data + let rib_count: i64 = store + .db + .conn + .query_row("SELECT COUNT(*) FROM ribs", [], |row| row.get(0)) + .map_err(|e| anyhow!("Failed to count ribs: {}", e))?; + + let update_count: i64 = store + .db + .conn + .query_row("SELECT COUNT(*) FROM updates", [], |row| row.get(0)) + .map_err(|e| anyhow!("Failed to count updates: {}", e))?; + + // Both RIBs stored in ribs table + assert_eq!(rib_count, 2); + // Only 2nd RIB has updates stored + assert_eq!(update_count, 1); + + Ok(()) + } } diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs index e6254cf..c955408 100644 --- a/src/lens/rib/mod.rs +++ b/src/lens/rib/mod.rs @@ -7,6 +7,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::path::PathBuf; +use std::sync::Arc; use anyhow::{anyhow, Result}; use bgpkit_broker::{BgpkitBroker, BrokerItem}; @@ -17,7 +18,9 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use crate::config::MonocleConfig; -use crate::database::{MonocleDatabase, RibRouteKey, RibStateStore, StoredRibEntry}; +use crate::database::{ + MonocleDatabase, RibRouteKey, RibStateStore, StoredRibEntry, StoredRibUpdate, +}; use crate::lens::country::CountryLens; use crate::lens::parse::ParseFilters; use crate::lens::time::TimeLens; @@ -28,7 +31,6 @@ use clap::Args; const FULL_FEED_V4_THRESHOLD: u32 = 800_000; const FULL_FEED_V6_THRESHOLD: u32 = 100_000; const RIB_LOOKBACK_HOURS: i64 = 24 * 30; -const UPDATES_LOOKAHEAD_HOURS: i64 = 2; type FullFeedAllowlists = HashMap>; @@ -183,6 +185,13 @@ impl<'a> RibLens<'a> { Self { db, config } } + /// Reconstruct RIB snapshots at specified timestamps. + /// + /// The `snapshot_visitor` callback is invoked for each snapshot with: + /// - `i64`: The target RIB timestamp + /// - `&RibStateStore`: The final reconstructed RIB state + /// - `&[StoredRibUpdate]`: Filtered updates that contributed to this snapshot + /// (empty for the first/base RIB, populated for subsequent RIBs) pub fn reconstruct_snapshots( &self, args: &RibArgs, @@ -190,7 +199,7 @@ impl<'a> RibLens<'a> { mut snapshot_visitor: F, ) -> Result where - F: FnMut(i64, &RibStateStore) -> Result<()>, + F: FnMut(i64, &RibStateStore, &[StoredRibUpdate]) -> Result<()>, { let normalized_ts = args.validate()?; let country_asns = self.resolve_country_asns(args.filters.country.as_deref(), no_update)?; @@ -223,16 +232,44 @@ impl<'a> RibLens<'a> { allowlists.get(group.collector.as_str()), )?; - self.replay_updates( - &mut state_store, - group, - args, - country_asns.as_ref(), - origin_filter.as_ref(), - as_path_regex.as_ref(), - allowlists.get(group.collector.as_str()), - &mut snapshot_visitor, - )?; + // If the first target timestamp equals the RIB time, emit it immediately + // with empty updates (it's the base RIB, not built from updates) + let rib_ts = group.rib_item.ts_start.and_utc().timestamp(); + if group + .rib_ts + .first() + .map(|&ts| ts == rib_ts) + .unwrap_or(false) + { + snapshot_visitor(group.rib_ts[0], &state_store, &[])?; + // Create a new group with remaining timestamps for replay + let remaining_ts: Vec = group.rib_ts.iter().skip(1).copied().collect(); + if !remaining_ts.is_empty() { + let mut new_group = group.clone(); + new_group.rib_ts = remaining_ts; + self.replay_updates( + &mut state_store, + &new_group, + args, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + &mut snapshot_visitor, + )?; + } + } else { + self.replay_updates( + &mut state_store, + group, + args, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + &mut snapshot_visitor, + )?; + } } let collector_count = groups @@ -502,14 +539,13 @@ impl<'a> RibLens<'a> { group_max_ts: i64, ) -> Result> { let rib_ts = rib_item.ts_start.and_utc().timestamp(); - let query_end = group_max_ts + Duration::hours(UPDATES_LOOKAHEAD_HOURS).num_seconds(); let mut broker = self .base_broker(args) .collector_id(collector) .data_type("updates") .ts_start(Self::timestamp_to_broker_string(rib_ts)?) - .ts_end(Self::timestamp_to_broker_string(query_end)?); + .ts_end(Self::timestamp_to_broker_string(group_max_ts)?); if let Some(project) = &args.filters.project { broker = broker.project(project); @@ -523,10 +559,11 @@ impl<'a> RibLens<'a> { ) })?; + // Only keep update files that contain data up to and including the target timestamp. + // An update file with ts_end <= group_max_ts has all elements with timestamp <= group_max_ts. updates.retain(|item| { - let item_start = item.ts_start.and_utc().timestamp(); let item_end = item.ts_end.and_utc().timestamp(); - item_start <= group_max_ts && item_end > rib_ts + item_end > rib_ts && item_end <= group_max_ts }); updates.sort_by_key(|item| item.ts_start); Ok(updates) @@ -586,6 +623,7 @@ impl<'a> RibLens<'a> { ) })?; + let collector_arc = Arc::from(collector); let mut batch = Vec::new(); for elem in parser { if elem.elem_type != ElemType::ANNOUNCE { @@ -599,7 +637,7 @@ impl<'a> RibLens<'a> { as_path_regex, full_feed_allowlist, ) { - batch.push(StoredRibEntry::from_elem(collector, elem)); + batch.push(StoredRibEntry::from_elem(Arc::clone(&collector_arc), elem)); } } @@ -620,10 +658,15 @@ impl<'a> RibLens<'a> { snapshot_visitor: &mut F, ) -> Result<()> where - F: FnMut(i64, &RibStateStore) -> Result<()>, + F: FnMut(i64, &RibStateStore, &[StoredRibUpdate]) -> Result<()>, { let mut pending = HashMap::::new(); let mut next_snapshot_index = 0usize; + let collector_arc = Arc::from(group.collector.as_str()); + + // Track filtered updates for the current snapshot interval + // These are updates that matched filters and affected the RIB state + let mut filtered_updates: Vec = Vec::new(); for update in &group.updates { let safe_filters = self.safe_parse_filters( @@ -647,56 +690,90 @@ impl<'a> RibLens<'a> { && elem.timestamp > group.rib_ts[next_snapshot_index] as f64 { self.flush_pending(state_store, &mut pending)?; - snapshot_visitor(group.rib_ts[next_snapshot_index], state_store)?; + // For the first RIB (index 0), pass empty updates + // For subsequent RIBs, pass the collected filtered updates + snapshot_visitor( + group.rib_ts[next_snapshot_index], + state_store, + &filtered_updates, + )?; + // Clear updates after emitting snapshot (they belong to this snapshot) + filtered_updates.clear(); next_snapshot_index += 1; } - self.apply_update_to_delta( + // Apply update and track if it was filtered/matched + let was_applied = self.apply_update_to_delta( &mut pending, state_store, - &group.collector, - elem, + Arc::clone(&collector_arc), + &elem, country_asns, origin_filter, as_path_regex, full_feed_allowlist, )?; + + // If the update was applied (matched filters), track it for the updates table + if was_applied { + let elem_type = elem.elem_type; + let update_record = StoredRibUpdate::from_elem( + group.rib_ts[next_snapshot_index.min(group.rib_ts.len() - 1)], + Arc::clone(&collector_arc), + elem, + elem_type, + ); + filtered_updates.push(update_record); + } } } while next_snapshot_index < group.rib_ts.len() { self.flush_pending(state_store, &mut pending)?; - snapshot_visitor(group.rib_ts[next_snapshot_index], state_store)?; + snapshot_visitor( + group.rib_ts[next_snapshot_index], + state_store, + &filtered_updates, + )?; + filtered_updates.clear(); next_snapshot_index += 1; } Ok(()) } + /// Apply an update to the pending delta and return whether it matched filters. + /// + /// Returns `true` if the update matched filters and was recorded in the delta, + /// `false` if it was filtered out (doesn't mean it won't affect state - withdraws + /// always check for existing routes). #[allow(clippy::too_many_arguments)] fn apply_update_to_delta( &self, pending: &mut HashMap, state_store: &RibStateStore, - collector: &str, - elem: BgpElem, + collector: Arc, + elem: &BgpElem, country_asns: Option<&HashSet>, origin_filter: Option<&OriginFilter>, as_path_regex: Option<&Regex>, full_feed_allowlist: Option<&HashSet<(String, u32)>>, - ) -> Result<()> { - let route_key = RibRouteKey::from_elem(collector, &elem); + ) -> Result { + let route_key = RibRouteKey::from_elem(Arc::clone(&collector), elem); match elem.elem_type { ElemType::WITHDRAW => { if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + Ok(true) + } else { + Ok(false) } } ElemType::ANNOUNCE => { let matches = self.announce_matches( - collector, - &elem, + &collector, + elem, country_asns, origin_filter, as_path_regex, @@ -706,15 +783,17 @@ impl<'a> RibLens<'a> { if matches { pending.insert( route_key, - DeltaOp::Upsert(StoredRibEntry::from_elem(collector, elem)), + DeltaOp::Upsert(StoredRibEntry::from_elem(collector, elem.clone())), ); + Ok(true) } else if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + Ok(true) + } else { + Ok(false) } } } - - Ok(()) } fn route_exists_in_state_or_delta(