diff --git a/CHANGELOG.md b/CHANGELOG.md index 92119c7..43649c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,40 @@ All notable changes to this project will be documented in this file. +## Unreleased changes + +### Breaking Changes + +* Changed `monocle rib --sqlite-path` output schema from single `elems` table to two tables: + * `ribs` table: stores final reconstructed RIB states at each target timestamp + * `updates` table: stores filtered BGP updates used to build 2nd and later RIB snapshots + * Updates table is only populated for RIBs after the first/base RIB + +### New Features + +* Added `monocle rib` for reconstructing RIB state at arbitrary timestamps + * Selects the latest RIB before each requested `rib_ts` and replays updates to the exact timestamp + * Supports stdout output by default and SQLite output via `--sqlite-path` + * Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file keyed by `rib_ts` + * Aborts when no RIB exists at or before a requested `rib_ts` for a selected collector + * Supports `--country`, `--origin-asn`, `--prefix`, `--as-path`, `--peer-asn`, `--collector`, `--project`, and `--full-feed-only` + +### Performance Improvements + +* Reduced string allocations in RIB reconstruction by using `Arc` for collector and prefix fields +* Removed unnecessary per-snapshot sorting that was `O(n log n)` on all entries +* Reduced updates query window from +2 hours lookahead to exact target timestamp + * Results in 33% fewer update files downloaded for typical requests + +### Code Improvements + +* Added `StoredRibUpdate` struct to track filtered BGP updates during reconstruction +* Added a session-backed SQLite store for merged reconstructed RIB export +* Updated `monocle rib` reconstruction to keep the working RIB state in memory + * Removes SQLite lookups and writes from the replay hot path + * Keeps `path_id` only for internal route identity during add-path reconstruction + * Narrows reconstructed RIB entries and SQLite export rows to collector, timestamp, peer_ip, peer_asn, prefix, as_path, and origin_asns + ## v1.2.0 - 2026-02-28 ### New Features diff --git a/Cargo.lock b/Cargo.lock index 06fd17d..e45f46f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1673,6 +1673,7 @@ dependencies = [ "oneio", "radar-rs", "rayon", + "regex", "rusqlite", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 20f841c..195b746 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ lib = [ # Database "dep:oneio", "dep:ipnet", + "dep:tempfile", # Lenses "dep:chrono-humanize", "dep:dateparser", @@ -98,6 +99,7 @@ lib = [ "dep:itertools", "dep:radar-rs", "dep:rayon", + "dep:regex", # Display (always included with lib) "dep:tabled", "dep:json_to_table", @@ -151,6 +153,7 @@ tracing = "0.1" # Database ipnet = { version = "2.10", features = ["json"], optional = true } oneio = { version = "0.20.1", default-features = false, features = ["https", "gz", "bz", "json"], optional = true } +tempfile = { version = "3", optional = true } # Lenses chrono-humanize = { version = "0.2", optional = true } @@ -162,6 +165,7 @@ bgpkit-commons = { version = "0.10.2", features = ["asinfo", "rpki", "countries" itertools = { version = "0.14", optional = true } radar-rs = { version = "0.1.0", optional = true } rayon = { version = "1.8", optional = true } +regex = { version = "1.11", optional = true } # Display tabled = { version = "0.20", optional = true } diff --git a/README.md b/README.md index 18d14c4..7360397 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ See through all Border Gateway Protocol (BGP) data with a monocle. - [`monocle parse`](#monocle-parse) - [Output Format](#output-format) - [`monocle search`](#monocle-search) + - [`monocle rib`](#monocle-rib) - [`monocle time`](#monocle-time) - [`monocle inspect`](#monocle-inspect) - [`monocle country`](#monocle-country) @@ -229,6 +230,7 @@ Subcommands: - `parse`: parse individual MRT files - `search`: search for matching messages from all available public MRT files +- `rib`: reconstruct final RIB state at one or more arbitrary timestamps - `server`: start a WebSocket server for programmatic access - `inspect`: unified AS and prefix information lookup - `country`: utility to look up country name and code @@ -259,6 +261,7 @@ Usage: monocle [OPTIONS] Commands: parse Parse individual MRT files given a file path, local or remote search Search BGP messages from all available public MRT files + rib Reconstruct final RIB state at one or more arbitrary timestamps server Start the WebSocket server (ws://
:/ws, health: http://
:/health) inspect Unified AS and prefix information lookup country Country name and code lookup utilities @@ -701,6 +704,110 @@ Use `--broker-files` to see the list of MRT files that would be queried without -c rrc00 --broker-files ``` +### `monocle rib` + +Reconstruct final RIB state at one or more arbitrary timestamps by loading the latest RIB at or before each requested `rib_ts` and replaying updates up to the exact timestamp. + +```text +➜ monocle rib --help +Reconstruct final RIB state at one or more arbitrary timestamps + +Usage: monocle rib [OPTIONS] ... + +Arguments: + ... Target RIB timestamp operand. Repeat to request multiple snapshots + +Options: + -o, --origin-asn Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude + -C, --country Filter by origin ASN registration country + --debug Print debug information + --format Output format: table, markdown, json, json-pretty, json-line, psv (default varies by command) + -p, --prefix Filter by network prefix(es), comma-separated. Prefix with ! to exclude + --json Output as JSON objects (shortcut for --format json-pretty) + -s, --include-super Include super-prefixes when filtering + --no-update Disable automatic database updates (use existing cached data only) + -S, --include-sub Include sub-prefixes when filtering + -J, --peer-asn Filter by peer ASN(s), comma-separated. Prefix with ! to exclude + -a, --as-path Filter by AS path regex string + -c, --collector Filter by collector, e.g., rrc00 or route-views2 + -P, --project Filter by route collection project, i.e. riperis or routeviews + --full-feed-only Keep only full-feed peers based on broker peer metadata + --sqlite-path SQLite output file path + -h, --help Print help + -V, --version Print version +``` + +Behavior: + +- A single timestamp operand writes to stdout by default. +- Stdout output is the reconstructed final route set, not an MRT/table-dump export. +- Stdout follows the normal streaming formatter, so the default is `psv` unless `--format` or `--json` is provided. +- Repeated timestamp operands require `--sqlite-path` and are written to one merged SQLite file. +- Providing `--sqlite-path` writes the reconstructed results to that SQLite file instead of stdout. +- If any selected collector has no RIB at or before a requested `rib_ts`, the command aborts instead of producing a partial result. +- `--country` uses local ASInfo registration data, and `--full-feed-only` keeps only peers with at least 800k IPv4 prefixes or 100k IPv6 prefixes in broker peer metadata. + +SQLite Output Schema: + +When using `--sqlite-path`, the output contains two tables: + +**`ribs` table** - Final reconstructed RIB states: +```sql +CREATE TABLE ribs ( + rib_ts INTEGER NOT NULL, -- Target RIB timestamp (the time you requested) + timestamp REAL NOT NULL, -- Actual BGP message timestamp + collector TEXT NOT NULL, -- Route collector name (e.g., 'rrc00') + peer_ip TEXT NOT NULL, -- Peer IP address + peer_asn INTEGER NOT NULL, -- Peer AS number + prefix TEXT NOT NULL, -- Network prefix + path_id INTEGER, -- BGP path identifier (for add-path) + as_path TEXT, -- AS path string + origin_asns TEXT -- Origin AS numbers (space-separated) +); +``` +- Contains one row per (rib_ts, route) showing the final routing table state at each requested timestamp. +- Query example: `SELECT * FROM ribs WHERE rib_ts = 1704067200 AND prefix = '1.1.1.0/24';` + +**`updates` table** - Filtered BGP updates (2nd and later RIBs only): +```sql +CREATE TABLE updates ( + rib_ts INTEGER NOT NULL, -- Target RIB timestamp this update contributed to + timestamp REAL NOT NULL, -- When the update message was received + collector TEXT NOT NULL, -- Route collector name + peer_ip TEXT NOT NULL, -- Peer IP address + peer_asn INTEGER NOT NULL, -- Peer AS number + prefix TEXT NOT NULL, -- Network prefix + path_id INTEGER, -- BGP path identifier + as_path TEXT, -- AS path string + origin_asns TEXT, -- Origin AS numbers + elem_type TEXT NOT NULL -- 'ANNOUNCE' or 'WITHDRAW' +); +``` +- Contains filtered updates that were applied to build 2nd and later RIB snapshots. +- **Not populated for the first/base RIB** (loaded directly from RIB dump file). +- Shows the incremental changes between consecutive RIB states. +- Useful for understanding what changed between snapshots. +- Query example: `SELECT * FROM updates WHERE rib_ts = 1704090000 ORDER BY timestamp;` + +Examples: + +```bash +# Print the reconstructed RIB for a single timestamp to stdout +monocle rib 2025-09-01T12:00:00Z -c rrc00 -o 13335 + +# Write multiple timestamps to one merged SQLite file in the current directory +monocle rib \ + 2025-09-01T12:00:00Z \ + 2025-09-01T18:00:00Z \ + --sqlite-path /tmp/rrc00-us.sqlite3 \ + -c rrc00 \ + --country US \ + --full-feed-only + +# Write a single reconstructed snapshot to SQLite +monocle rib 2025-09-01T12:00:00Z --sqlite-path /tmp/route-views2.sqlite3 -c route-views2 +``` + ### `monocle time` Parse and convert time strings between various formats. diff --git a/src/bin/commands/elem_format.rs b/src/bin/commands/elem_format.rs index edc4d20..452029d 100644 --- a/src/bin/commands/elem_format.rs +++ b/src/bin/commands/elem_format.rs @@ -17,7 +17,9 @@ pub const AVAILABLE_FIELDS: &[&str] = &[ "peer_ip", "peer_asn", "prefix", + "path_id", "as_path", + "origin_asns", "origin", "next_hop", "local_pref", @@ -174,11 +176,26 @@ pub fn get_field_value_with_time_format( "peer_ip" => elem.peer_ip.to_string(), "peer_asn" => elem.peer_asn.to_string(), "prefix" => elem.prefix.to_string(), + "path_id" => elem + .prefix + .path_id + .map(|path_id| path_id.to_string()) + .unwrap_or_default(), "as_path" => elem .as_path .as_ref() .map(|p| p.to_string()) .unwrap_or_default(), + "origin_asns" => elem + .origin_asns + .as_ref() + .map(|asns| { + asns.iter() + .map(|asn| asn.to_string()) + .collect::>() + .join(" ") + }) + .unwrap_or_default(), "origin" => elem .origin .as_ref() @@ -288,10 +305,18 @@ pub fn build_json_object( "peer_ip" => json!(elem.peer_ip.to_string()), "peer_asn" => json!(elem.peer_asn), "prefix" => json!(elem.prefix.to_string()), + "path_id" => match elem.prefix.path_id { + Some(path_id) => json!(path_id), + None => serde_json::Value::Null, + }, "as_path" => match &elem.as_path { Some(p) => json!(p.to_string()), None => serde_json::Value::Null, }, + "origin_asns" => match &elem.origin_asns { + Some(asns) => json!(asns.iter().map(|asn| asn.to_string()).collect::>()), + None => serde_json::Value::Null, + }, "origin" => match &elem.origin { Some(o) => json!(o.to_string()), None => serde_json::Value::Null, diff --git a/src/bin/commands/mod.rs b/src/bin/commands/mod.rs index 8f7a72e..4f5d0f0 100644 --- a/src/bin/commands/mod.rs +++ b/src/bin/commands/mod.rs @@ -6,6 +6,7 @@ pub mod inspect; pub mod ip; pub mod parse; pub mod pfx2as; +pub mod rib; pub mod rpki; pub mod search; pub mod time; diff --git a/src/bin/commands/rib.rs b/src/bin/commands/rib.rs new file mode 100644 index 0000000..49b0beb --- /dev/null +++ b/src/bin/commands/rib.rs @@ -0,0 +1,241 @@ +use std::fs; +use std::io::{BufWriter, Write}; +use std::path::Path; + +use anyhow::{anyhow, Result}; +use monocle::database::{MonocleDatabase, RibSqliteStore, StoredRibEntry}; +use monocle::lens::rib::RibLens; +use monocle::utils::{OutputFormat, TimestampFormat}; +use monocle::MonocleConfig; +use serde_json::json; +use tabled::builder::Builder; +use tabled::settings::Style; + +use super::elem_format::get_header; + +pub use monocle::lens::rib::RibArgs; + +const DEFAULT_FIELDS_RIB: &[&str] = &[ + "collector", + "timestamp", + "peer_ip", + "peer_asn", + "prefix", + "as_path", + "origin_asns", +]; + +pub fn run(config: &MonocleConfig, args: RibArgs, output_format: OutputFormat, no_update: bool) { + if let Err(error) = run_inner(config, args, output_format, no_update) { + eprintln!("ERROR: {}", error); + std::process::exit(1); + } +} + +fn run_inner( + config: &MonocleConfig, + args: RibArgs, + output_format: OutputFormat, + no_update: bool, +) -> Result<()> { + let sqlite_path = config.sqlite_path(); + let db = MonocleDatabase::open(&sqlite_path) + .map_err(|e| anyhow!("Failed to open database '{}': {}", sqlite_path, e))?; + let lens = RibLens::new(&db, config); + + if args.sqlite_path.is_some() { + run_sqlite_output(&lens, &args, no_update) + } else { + run_stdout(&lens, &args, output_format, no_update) + } +} + +fn run_stdout( + lens: &RibLens<'_>, + args: &RibArgs, + output_format: OutputFormat, + no_update: bool, +) -> Result<()> { + let stdout = std::io::stdout(); + let mut stdout = BufWriter::new(stdout.lock()); + + if output_format == OutputFormat::Table { + let mut entries = Vec::::new(); + lens.reconstruct_snapshots( + args, + no_update, + |_rib_ts, state_store, _filtered_updates| { + state_store.visit_entries(|entry| { + entries.push(entry.clone()); + Ok(()) + }) + }, + )?; + + if !entries.is_empty() { + writeln!( + stdout, + "{}", + format_entries_table(&entries, DEFAULT_FIELDS_RIB) + ) + .map_err(|e| anyhow!("Failed to write table output: {}", e))?; + } + return Ok(()); + } + + let mut header_written = false; + lens.reconstruct_snapshots( + args, + no_update, + |_rib_ts, state_store, _filtered_updates| { + if !header_written { + if let Some(header) = get_header(output_format, DEFAULT_FIELDS_RIB) { + writeln!(stdout, "{}", header) + .map_err(|e| anyhow!("Failed to write output header: {}", e))?; + } + header_written = true; + } + + state_store.visit_entries(|entry| { + if let Some(line) = format_entry(entry, output_format, DEFAULT_FIELDS_RIB) { + writeln!(stdout, "{}", line) + .map_err(|e| anyhow!("Failed to write reconstructed RIB row: {}", e))?; + } + Ok(()) + }) + }, + )?; + + Ok(()) +} + +fn run_sqlite_output(lens: &RibLens<'_>, args: &RibArgs, no_update: bool) -> Result<()> { + args.validate()?; + let output_path = args + .sqlite_path + .as_deref() + .ok_or_else(|| anyhow!("Missing --sqlite-path for SQLite output"))?; + + remove_existing_file(output_path)?; + + let mut sqlite_store = RibSqliteStore::new(path_to_str(output_path)?, true)?; + let summary = + lens.reconstruct_snapshots(args, no_update, |rib_ts, state_store, filtered_updates| { + sqlite_store.insert_snapshot(rib_ts, state_store, filtered_updates) + })?; + sqlite_store.finalize_indexes()?; + + eprintln!( + "wrote {} reconstructed RIB snapshot(s) to {}", + summary.rib_ts.len(), + output_path.display() + ); + Ok(()) +} + +fn format_entries_table(entries: &[StoredRibEntry], fields: &[&str]) -> String { + let mut builder = Builder::default(); + builder.push_record(fields.iter().copied()); + + for entry in entries { + let row = fields + .iter() + .map(|field| entry_field_value(entry, field)) + .collect::>(); + builder.push_record(row); + } + + let mut table = builder.build(); + table.with(Style::rounded()); + table.to_string() +} + +fn format_entry( + entry: &StoredRibEntry, + output_format: OutputFormat, + fields: &[&str], +) -> Option { + match output_format { + OutputFormat::Json | OutputFormat::JsonLine => { + Some(serde_json::to_string(&build_json_object(entry, fields)).unwrap_or_default()) + } + OutputFormat::JsonPretty => Some( + serde_json::to_string_pretty(&build_json_object(entry, fields)).unwrap_or_default(), + ), + OutputFormat::Psv => Some( + fields + .iter() + .map(|field| entry_field_value(entry, field)) + .collect::>() + .join("|"), + ), + OutputFormat::Table => None, + OutputFormat::Markdown => Some(format!( + "| {} |", + fields + .iter() + .map(|field| entry_field_value(entry, field)) + .collect::>() + .join(" | ") + )), + } +} + +fn build_json_object(entry: &StoredRibEntry, fields: &[&str]) -> serde_json::Value { + let mut obj = serde_json::Map::new(); + + for field in fields { + let value = match *field { + "collector" => json!(entry.collector.to_string()), + "timestamp" => json!(entry.timestamp), + "peer_ip" => json!(entry.peer_ip.to_string()), + "peer_asn" => json!(entry.peer_asn), + "prefix" => json!(entry.prefix.to_string()), + "as_path" => entry + .as_path + .as_ref() + .map_or(serde_json::Value::Null, |value| json!(value)), + "origin_asns" => entry + .origin_asns + .as_ref() + .map_or(serde_json::Value::Null, |values| { + json!(values.iter().map(u32::to_string).collect::>()) + }), + _ => serde_json::Value::Null, + }; + + obj.insert((*field).to_string(), value); + } + + serde_json::Value::Object(obj) +} + +fn entry_field_value(entry: &StoredRibEntry, field: &str) -> String { + match field { + "collector" => entry.collector.to_string(), + "timestamp" => TimestampFormat::Unix.format_timestamp(entry.timestamp), + "peer_ip" => entry.peer_ip.to_string(), + "peer_asn" => entry.peer_asn.to_string(), + "prefix" => entry.prefix.to_string(), + "as_path" => entry.as_path.clone().unwrap_or_default(), + "origin_asns" => entry.origin_asns_string().unwrap_or_default(), + _ => String::new(), + } +} + +fn remove_existing_file(path: &Path) -> Result<()> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(error) => Err(anyhow!( + "Failed to remove existing output file '{}': {}", + path.display(), + error + )), + } +} + +fn path_to_str(path: &Path) -> Result<&str> { + path.to_str() + .ok_or_else(|| anyhow!("Path '{}' contains invalid UTF-8", path.display())) +} diff --git a/src/bin/monocle.rs b/src/bin/monocle.rs index 20dac06..c5cf360 100644 --- a/src/bin/monocle.rs +++ b/src/bin/monocle.rs @@ -17,6 +17,7 @@ use commands::inspect::InspectArgs; use commands::ip::IpArgs; use commands::parse::ParseArgs; use commands::pfx2as::Pfx2asArgs; +use commands::rib::RibArgs; use commands::rpki::RpkiCommands; use commands::search::SearchArgs; use commands::time::TimeArgs; @@ -57,6 +58,9 @@ enum Commands { /// Search BGP messages from all available public MRT files. Search(SearchArgs), + /// Reconstruct final RIB state at one or more arbitrary timestamps. + Rib(RibArgs), + /// Start the WebSocket server (ws://
:/ws, health: http://
:/health) /// /// Note: This requires building with the `server` feature enabled. @@ -176,6 +180,9 @@ fn main() { match cli.command { Commands::Parse(args) => commands::parse::run(args, streaming_output_format), Commands::Search(args) => commands::search::run(&config, args, streaming_output_format), + Commands::Rib(args) => { + commands::rib::run(&config, args, streaming_output_format, cli.no_update) + } Commands::Server(args) => { // The server requires the `server` feature (axum + tokio). Keep the CLI diff --git a/src/database/mod.rs b/src/database/mod.rs index ed01451..06b70ac 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -125,6 +125,8 @@ pub use monocle::{ // Requires lib feature because MsgStore depends on bgpkit_parser::BgpElem #[cfg(feature = "lib")] pub use session::MsgStore; +#[cfg(feature = "lib")] +pub use session::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry, StoredRibUpdate}; // ============================================================================= // Helper function diff --git a/src/database/session/mod.rs b/src/database/session/mod.rs index d69b267..11881fb 100644 --- a/src/database/session/mod.rs +++ b/src/database/session/mod.rs @@ -15,6 +15,10 @@ #[cfg(feature = "lib")] mod msg_store; +#[cfg(feature = "lib")] +mod rib_store; #[cfg(feature = "lib")] pub use msg_store::MsgStore; +#[cfg(feature = "lib")] +pub use rib_store::{RibRouteKey, RibSqliteStore, RibStateStore, StoredRibEntry, StoredRibUpdate}; diff --git a/src/database/session/rib_store.rs b/src/database/session/rib_store.rs new file mode 100644 index 0000000..364cb46 --- /dev/null +++ b/src/database/session/rib_store.rs @@ -0,0 +1,488 @@ +//! Working-state storage and SQLite export for reconstructed RIB snapshots. + +use std::collections::HashMap; +use std::net::IpAddr; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use bgpkit_parser::models::ElemType; +use bgpkit_parser::BgpElem; +use rusqlite::params; + +use crate::database::core::DatabaseConn; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RibRouteKey { + pub collector: Arc, + pub peer_ip: IpAddr, + pub peer_asn: u32, + pub prefix: Arc, + pub path_id: Option, +} + +impl RibRouteKey { + pub fn from_elem(collector: Arc, elem: &BgpElem) -> Self { + Self { + collector, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn.to_u32(), + prefix: Arc::from(elem.prefix.prefix.to_string().into_boxed_str()), + path_id: elem.prefix.path_id, + } + } + + pub fn from_entry(entry: &StoredRibEntry) -> Self { + Self { + collector: Arc::clone(&entry.collector), + peer_ip: entry.peer_ip, + peer_asn: entry.peer_asn, + prefix: Arc::clone(&entry.prefix), + path_id: entry.path_id, + } + } +} + +#[derive(Debug, Clone)] +pub struct StoredRibEntry { + pub collector: Arc, + pub timestamp: f64, + pub peer_ip: IpAddr, + pub peer_asn: u32, + pub prefix: Arc, + pub path_id: Option, + pub as_path: Option, + pub origin_asns: Option>, +} + +impl StoredRibEntry { + pub fn from_elem(collector: Arc, elem: BgpElem) -> Self { + Self { + collector, + timestamp: elem.timestamp, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn.to_u32(), + prefix: Arc::from(elem.prefix.prefix.to_string().into_boxed_str()), + path_id: elem.prefix.path_id, + as_path: elem.as_path.map(|path| path.to_string()), + origin_asns: elem + .origin_asns + .map(|asns| asns.into_iter().map(|asn| asn.to_u32()).collect::>()), + } + } + + pub fn route_key(&self) -> RibRouteKey { + RibRouteKey::from_entry(self) + } + + pub fn origin_asns_string(&self) -> Option { + self.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(u32::to_string) + .collect::>() + .join(" ") + }) + } +} + +/// Represents a filtered BGP update message that contributed to RIB reconstruction. +/// Stored in the `updates` table for 2nd and later RIB snapshots. +#[derive(Debug, Clone)] +pub struct StoredRibUpdate { + /// The target RIB timestamp this update contributed to + pub rib_ts: i64, + /// When the update message was received + pub timestamp: f64, + pub collector: Arc, + pub peer_ip: IpAddr, + pub peer_asn: u32, + pub prefix: Arc, + pub path_id: Option, + pub as_path: Option, + pub origin_asns: Option>, + /// The type of BGP message (ANNOUNCE or WITHDRAW) + pub elem_type: ElemType, +} + +impl StoredRibUpdate { + pub fn from_elem(rib_ts: i64, collector: Arc, elem: BgpElem, elem_type: ElemType) -> Self { + Self { + rib_ts, + collector, + timestamp: elem.timestamp, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn.to_u32(), + prefix: Arc::from(elem.prefix.prefix.to_string().into_boxed_str()), + path_id: elem.prefix.path_id, + as_path: elem.as_path.map(|path| path.to_string()), + origin_asns: elem + .origin_asns + .map(|asns| asns.into_iter().map(|asn| asn.to_u32()).collect::>()), + elem_type, + } + } + + pub fn origin_asns_string(&self) -> Option { + self.origin_asns.as_ref().map(|asns| { + asns.iter() + .map(u32::to_string) + .collect::>() + .join(" ") + }) + } +} + +pub struct RibStateStore { + entries: HashMap, +} + +impl RibStateStore { + pub fn new_temp() -> Result { + Ok(Self { + entries: HashMap::new(), + }) + } + + pub fn count(&self) -> Result { + Ok(self.entries.len() as u64) + } + + pub fn route_exists(&self, key: &RibRouteKey) -> Result { + Ok(self.entries.contains_key(key)) + } + + pub fn upsert_entry(&mut self, entry: StoredRibEntry) -> Result<()> { + self.upsert_entries(vec![entry]) + } + + pub fn upsert_entries(&mut self, entries: I) -> Result<()> + where + I: IntoIterator, + { + for entry in entries { + self.entries.insert(entry.route_key(), entry); + } + Ok(()) + } + + pub fn delete_key(&mut self, key: &RibRouteKey) -> Result<()> { + self.entries.remove(key); + Ok(()) + } + + pub fn delete_keys(&mut self, keys: I) -> Result<()> + where + I: IntoIterator, + { + for key in keys { + self.entries.remove(&key); + } + Ok(()) + } + + pub fn visit_entries(&self, mut visitor: F) -> Result<()> + where + F: FnMut(&StoredRibEntry) -> Result<()>, + { + for entry in self.entries.values() { + visitor(entry)?; + } + Ok(()) + } +} + +/// SQLite storage for RIB reconstruction output with two tables: +/// +/// - `ribs`: Stores final reconstructed RIB states at each target timestamp. +/// Contains one row per (rib_ts, route) showing the routing table state. +/// +/// - `updates`: Stores filtered BGP update messages that were applied to build +/// subsequent RIB snapshots. Only populated for 2nd and later RIBs. +/// Shows the incremental changes between snapshots. +pub struct RibSqliteStore { + db: DatabaseConn, + /// Tracks which snapshot index we're processing (0 = first RIB) + snapshot_index: usize, +} + +impl RibSqliteStore { + pub fn new(db_path: &str, reset: bool) -> Result { + let db = DatabaseConn::open_path(db_path)?; + let store = Self { + db, + snapshot_index: 0, + }; + store.initialize(reset)?; + Ok(store) + } + + fn initialize(&self, reset: bool) -> Result<()> { + if reset { + self.db + .conn + .execute("DROP TABLE IF EXISTS ribs", []) + .map_err(|e| anyhow!("Failed to drop existing ribs table: {}", e))?; + self.db + .conn + .execute("DROP TABLE IF EXISTS updates", []) + .map_err(|e| anyhow!("Failed to drop existing updates table: {}", e))?; + } + + self.db + .conn + .execute_batch( + r#" + -- Final reconstructed RIB states at each target timestamp + -- One row per (rib_ts, route_key) showing the routing table state + CREATE TABLE IF NOT EXISTS ribs ( + rib_ts INTEGER NOT NULL, + timestamp REAL NOT NULL, + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER, + as_path TEXT, + origin_asns TEXT + ); + + -- Filtered BGP updates used to build 2nd and later RIB snapshots + -- Only populated when multiple rib_ts are requested + -- Shows incremental changes between consecutive RIB states + CREATE TABLE IF NOT EXISTS updates ( + rib_ts INTEGER NOT NULL, + timestamp REAL NOT NULL, + collector TEXT NOT NULL, + peer_ip TEXT NOT NULL, + peer_asn INTEGER NOT NULL, + prefix TEXT NOT NULL, + path_id INTEGER, + as_path TEXT, + origin_asns TEXT, + elem_type TEXT NOT NULL + ); + "#, + ) + .map_err(|e| anyhow!("Failed to initialize RIB SQLite schema: {}", e))?; + Ok(()) + } + + /// Insert a RIB snapshot and its associated filtered updates. + /// + /// # Arguments + /// * `rib_ts` - The target RIB timestamp + /// * `state_store` - The final RIB state to store + /// * `filtered_updates` - Updates that contributed to this RIB (only stored for 2nd+ RIBs) + pub fn insert_snapshot( + &mut self, + rib_ts: i64, + state_store: &RibStateStore, + filtered_updates: &[StoredRibUpdate], + ) -> Result<()> { + let tx = self + .db + .conn + .unchecked_transaction() + .map_err(|e| anyhow!("Failed to begin RIB output transaction: {}", e))?; + + // Insert RIB entries into 'ribs' table (always populated) + { + let mut rib_stmt = tx + .prepare_cached( + r#" + INSERT INTO ribs ( + rib_ts, timestamp, collector, peer_ip, peer_asn, + prefix, path_id, as_path, origin_asns + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) + "#, + ) + .map_err(|e| anyhow!("Failed to prepare ribs insert statement: {}", e))?; + + state_store.visit_entries(|entry| { + rib_stmt + .execute(params![ + rib_ts, + entry.timestamp, + entry.collector, + entry.peer_ip.to_string(), + entry.peer_asn, + entry.prefix, + entry.path_id, + entry.as_path, + entry.origin_asns_string(), + ]) + .map_err(|e| anyhow!("Failed to insert into ribs table: {}", e))?; + Ok(()) + })?; + } + + // Insert filtered updates into 'updates' table (only for 2nd and later RIBs) + if self.snapshot_index > 0 && !filtered_updates.is_empty() { + let mut update_stmt = tx + .prepare_cached( + r#" + INSERT INTO updates ( + rib_ts, timestamp, collector, peer_ip, peer_asn, + prefix, path_id, as_path, origin_asns, elem_type + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) + "#, + ) + .map_err(|e| anyhow!("Failed to prepare updates insert statement: {}", e))?; + + for update in filtered_updates { + let elem_type_str = match update.elem_type { + ElemType::ANNOUNCE => "ANNOUNCE", + ElemType::WITHDRAW => "WITHDRAW", + }; + + update_stmt + .execute(params![ + update.rib_ts, + update.timestamp, + update.collector, + update.peer_ip.to_string(), + update.peer_asn, + update.prefix, + update.path_id, + update.as_path, + update.origin_asns_string(), + elem_type_str, + ]) + .map_err(|e| anyhow!("Failed to insert into updates table: {}", e))?; + } + } + + tx.commit() + .map_err(|e| anyhow!("Failed to commit RIB output transaction: {}", e))?; + self.snapshot_index += 1; + Ok(()) + } + + pub fn finalize_indexes(&self) -> Result<()> { + self.db + .conn + .execute_batch( + r#" + -- Indexes for ribs table (final RIB states) + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts ON ribs(rib_ts); + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts_prefix ON ribs(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts_peer_asn ON ribs(rib_ts, peer_asn); + CREATE INDEX IF NOT EXISTS idx_ribs_rib_ts_collector ON ribs(rib_ts, collector); + + -- Indexes for updates table (intermediate changes) + CREATE INDEX IF NOT EXISTS idx_updates_rib_ts ON updates(rib_ts); + CREATE INDEX IF NOT EXISTS idx_updates_rib_ts_prefix ON updates(rib_ts, prefix); + CREATE INDEX IF NOT EXISTS idx_updates_timestamp ON updates(timestamp); + CREATE INDEX IF NOT EXISTS idx_updates_collector ON updates(collector); + "#, + ) + .map_err(|e| anyhow!("Failed to create RIB SQLite indexes: {}", e))?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bgpkit_parser::models::{AsPath, AsPathSegment, ElemType, NetworkPrefix}; + use std::net::{IpAddr, Ipv4Addr}; + use std::sync::Arc; + + fn test_elem() -> Result { + Ok(BgpElem { + timestamp: 1234.0, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1)), + peer_asn: 64496.into(), + prefix: NetworkPrefix::new("203.0.113.0/24".parse()?, Some(7)), + next_hop: Some(IpAddr::V4(Ipv4Addr::new(192, 0, 2, 2))), + as_path: Some(AsPath { + segments: vec![AsPathSegment::AsSequence(vec![64496.into(), 64497.into()])], + }), + origin_asns: Some(vec![64497.into()]), + origin: None, + local_pref: Some(100), + med: Some(50), + communities: None, + atomic: false, + aggr_asn: None, + aggr_ip: None, + only_to_customer: None, + unknown: None, + deprecated: None, + }) + } + + #[test] + fn test_rib_state_store_round_trip() -> Result<()> { + let mut store = RibStateStore::new_temp()?; + let entry = StoredRibEntry::from_elem(Arc::from("rrc00"), test_elem()?); + store.upsert_entry(entry.clone())?; + assert!(store.route_exists(&entry.route_key())?); + + let mut visited = Vec::new(); + store.visit_entries(|entry| { + visited.push(entry.clone()); + Ok(()) + })?; + + assert_eq!(visited.len(), 1); + assert_eq!(visited[0].collector.as_ref(), "rrc00"); + assert_eq!(visited[0].path_id, Some(7)); + Ok(()) + } + + #[test] + fn test_sqlite_store_two_tables() -> Result<()> { + use tempfile::NamedTempFile; + + let temp_file = NamedTempFile::new()?; + let path = temp_file.path().to_str().unwrap(); + + let mut store = RibSqliteStore::new(path, true)?; + + // Create first RIB snapshot (no updates should be stored) + let mut state1 = RibStateStore::new_temp()?; + let entry1 = StoredRibEntry::from_elem(Arc::from("rrc00"), test_elem()?); + state1.upsert_entry(entry1)?; + + // First RIB: no updates stored + store.insert_snapshot(1704067200, &state1, &[])?; + + // Create second RIB snapshot with updates + let mut state2 = RibStateStore::new_temp()?; + let entry2 = StoredRibEntry::from_elem(Arc::from("rrc00"), test_elem()?); + state2.upsert_entry(entry2)?; + + let update = StoredRibUpdate::from_elem( + 1704069000, + Arc::from("rrc00"), + test_elem()?, + ElemType::ANNOUNCE, + ); + + // Second RIB: updates should be stored + store.insert_snapshot(1704069000, &state2, &[update])?; + + store.finalize_indexes()?; + + // Verify tables exist and have correct data + let rib_count: i64 = store + .db + .conn + .query_row("SELECT COUNT(*) FROM ribs", [], |row| row.get(0)) + .map_err(|e| anyhow!("Failed to count ribs: {}", e))?; + + let update_count: i64 = store + .db + .conn + .query_row("SELECT COUNT(*) FROM updates", [], |row| row.get(0)) + .map_err(|e| anyhow!("Failed to count updates: {}", e))?; + + // Both RIBs stored in ribs table + assert_eq!(rib_count, 2); + // Only 2nd RIB has updates stored + assert_eq!(update_count, 1); + + Ok(()) + } +} diff --git a/src/lens/mod.rs b/src/lens/mod.rs index 3deaa96..937b883 100644 --- a/src/lens/mod.rs +++ b/src/lens/mod.rs @@ -78,6 +78,10 @@ pub mod parse; #[cfg(feature = "lib")] pub mod search; +// RibLens - arbitrary timestamp RIB reconstruction +#[cfg(feature = "lib")] +pub mod rib; + // RpkiLens - RPKI validation and data #[cfg(feature = "lib")] pub mod rpki; diff --git a/src/lens/rib/mod.rs b/src/lens/rib/mod.rs new file mode 100644 index 0000000..c955408 --- /dev/null +++ b/src/lens/rib/mod.rs @@ -0,0 +1,1121 @@ +//! RIB reconstruction lens. +//! +//! This module reconstructs final RIB state at arbitrary timestamps by: +//! 1. Selecting the latest RIB before each target time +//! 2. Replaying overlapping updates up to the exact target time +//! 3. Materializing only the final route state for each requested `rib_ts` + +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::path::PathBuf; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use bgpkit_broker::{BgpkitBroker, BrokerItem}; +use bgpkit_parser::models::ElemType; +use bgpkit_parser::BgpElem; +use chrono::{DateTime, Duration}; +use regex::Regex; +use serde::{Deserialize, Serialize}; + +use crate::config::MonocleConfig; +use crate::database::{ + MonocleDatabase, RibRouteKey, RibStateStore, StoredRibEntry, StoredRibUpdate, +}; +use crate::lens::country::CountryLens; +use crate::lens::parse::ParseFilters; +use crate::lens::time::TimeLens; + +#[cfg(feature = "cli")] +use clap::Args; + +const FULL_FEED_V4_THRESHOLD: u32 = 800_000; +const FULL_FEED_V6_THRESHOLD: u32 = 100_000; +const RIB_LOOKBACK_HOURS: i64 = 24 * 30; + +type FullFeedAllowlists = HashMap>; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(Args))] +pub struct RibFilters { + /// Filter by origin AS Number(s), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'o', long, value_delimiter = ','))] + #[serde(default)] + pub origin_asn: Vec, + + /// Filter by origin ASN registration country. + #[cfg_attr(feature = "cli", clap(short = 'C', long))] + pub country: Option, + + /// Filter by network prefix(es), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'p', long, value_delimiter = ','))] + #[serde(default)] + pub prefix: Vec, + + /// Include super-prefixes when filtering. + #[cfg_attr(feature = "cli", clap(short = 's', long))] + #[serde(default)] + pub include_super: bool, + + /// Include sub-prefixes when filtering. + #[cfg_attr(feature = "cli", clap(short = 'S', long))] + #[serde(default)] + pub include_sub: bool, + + /// Filter by peer ASN(s), comma-separated. Prefix with ! to exclude. + #[cfg_attr(feature = "cli", clap(short = 'J', long, value_delimiter = ','))] + #[serde(default)] + pub peer_asn: Vec, + + /// Filter by AS path regex string. + #[cfg_attr(feature = "cli", clap(short = 'a', long))] + pub as_path: Option, + + /// Filter by collector, e.g., rrc00 or route-views2. + #[cfg_attr(feature = "cli", clap(short = 'c', long))] + pub collector: Option, + + /// Filter by route collection project, i.e. riperis or routeviews. + #[cfg_attr(feature = "cli", clap(short = 'P', long))] + pub project: Option, + + /// Keep only full-feed peers based on broker peer metadata. + #[cfg_attr(feature = "cli", clap(long))] + #[serde(default)] + pub full_feed_only: bool, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[cfg_attr(feature = "cli", derive(Args))] +pub struct RibArgs { + /// Target RIB timestamp operand. Repeat to request multiple snapshots. + #[cfg_attr(feature = "cli", clap(value_name = "RIB_TS", required = true))] + #[serde(default)] + pub rib_ts: Vec, + + #[cfg_attr(feature = "cli", clap(flatten))] + #[serde(flatten)] + pub filters: RibFilters, + + /// SQLite output file path. + #[cfg_attr(feature = "cli", clap(long))] + pub sqlite_path: Option, +} + +impl RibArgs { + pub fn normalized_rib_ts(&self) -> Result> { + let time_lens = TimeLens::new(); + let mut timestamps = BTreeSet::new(); + + for value in &self.rib_ts { + let ts = time_lens + .parse_time_string(value) + .map_err(|e| anyhow!("Invalid RIB timestamp '{}': {}", value, e))? + .timestamp(); + timestamps.insert(ts); + } + + if timestamps.is_empty() { + return Err(anyhow!("At least one RIB timestamp is required")); + } + + Ok(timestamps.into_iter().collect()) + } + + pub fn validate(&self) -> Result> { + let normalized_ts = self.normalized_rib_ts()?; + + let parse_filters = ParseFilters { + origin_asn: self.filters.origin_asn.clone(), + prefix: self.filters.prefix.clone(), + include_super: self.filters.include_super, + include_sub: self.filters.include_sub, + peer_asn: self.filters.peer_asn.clone(), + as_path: self.filters.as_path.clone(), + ..Default::default() + }; + parse_filters.validate()?; + + if let Some(as_path) = &self.filters.as_path { + Regex::new(as_path) + .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", as_path, e))?; + } + + if normalized_ts.len() > 1 && self.sqlite_path.is_none() { + return Err(anyhow!("Multiple RIB timestamps require --sqlite-path.")); + } + + Ok(normalized_ts) + } +} + +#[derive(Debug, Clone)] +pub struct RibRunSummary { + pub rib_ts: Vec, + pub collectors_processed: usize, + pub groups_processed: usize, +} + +#[derive(Debug, Clone)] +struct RibReplayGroup { + collector: String, + rib_item: BrokerItem, + rib_ts: Vec, + updates: Vec, +} + +#[derive(Debug, Clone)] +enum DeltaOp { + Upsert(StoredRibEntry), + Delete(RibRouteKey), +} + +#[derive(Debug, Clone)] +struct OriginFilter { + values: HashSet, + negated: bool, +} + +pub struct RibLens<'a> { + db: &'a MonocleDatabase, + config: &'a MonocleConfig, +} + +impl<'a> RibLens<'a> { + pub fn new(db: &'a MonocleDatabase, config: &'a MonocleConfig) -> Self { + Self { db, config } + } + + /// Reconstruct RIB snapshots at specified timestamps. + /// + /// The `snapshot_visitor` callback is invoked for each snapshot with: + /// - `i64`: The target RIB timestamp + /// - `&RibStateStore`: The final reconstructed RIB state + /// - `&[StoredRibUpdate]`: Filtered updates that contributed to this snapshot + /// (empty for the first/base RIB, populated for subsequent RIBs) + pub fn reconstruct_snapshots( + &self, + args: &RibArgs, + no_update: bool, + mut snapshot_visitor: F, + ) -> Result + where + F: FnMut(i64, &RibStateStore, &[StoredRibUpdate]) -> Result<()>, + { + let normalized_ts = args.validate()?; + let country_asns = self.resolve_country_asns(args.filters.country.as_deref(), no_update)?; + let origin_filter = Self::parse_origin_filter(&args.filters.origin_asn)?; + let as_path_regex = Self::compile_as_path_regex(args.filters.as_path.as_deref())?; + let groups = self.resolve_replay_groups(args, &normalized_ts)?; + + let allowlists = if args.filters.full_feed_only { + self.build_full_feed_allowlists(&groups)? + } else { + HashMap::new() + }; + + for group in &groups { + let mut state_store = RibStateStore::new_temp()?; + let safe_base_filters = self.safe_parse_filters( + args, + group.rib_item.ts_start.and_utc().timestamp(), + group.rib_item.ts_end.and_utc().timestamp(), + ); + + self.load_base_rib( + &mut state_store, + &group.collector, + &group.rib_item, + &safe_base_filters, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + )?; + + // If the first target timestamp equals the RIB time, emit it immediately + // with empty updates (it's the base RIB, not built from updates) + let rib_ts = group.rib_item.ts_start.and_utc().timestamp(); + if group + .rib_ts + .first() + .map(|&ts| ts == rib_ts) + .unwrap_or(false) + { + snapshot_visitor(group.rib_ts[0], &state_store, &[])?; + // Create a new group with remaining timestamps for replay + let remaining_ts: Vec = group.rib_ts.iter().skip(1).copied().collect(); + if !remaining_ts.is_empty() { + let mut new_group = group.clone(); + new_group.rib_ts = remaining_ts; + self.replay_updates( + &mut state_store, + &new_group, + args, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + &mut snapshot_visitor, + )?; + } + } else { + self.replay_updates( + &mut state_store, + group, + args, + country_asns.as_ref(), + origin_filter.as_ref(), + as_path_regex.as_ref(), + allowlists.get(group.collector.as_str()), + &mut snapshot_visitor, + )?; + } + } + + let collector_count = groups + .iter() + .map(|group| group.collector.as_str()) + .collect::>() + .len(); + + Ok(RibRunSummary { + rib_ts: normalized_ts, + collectors_processed: collector_count, + groups_processed: groups.len(), + }) + } + + pub fn file_name_prefix(&self, args: &RibArgs, rib_ts: &[i64]) -> Result { + let base = if rib_ts.len() == 1 { + format!( + "monocle-rib-{}", + Self::format_rib_ts_for_filename(rib_ts[0])? + ) + } else { + format!( + "monocle-rib-{}-{}", + Self::format_rib_ts_for_filename( + *rib_ts + .first() + .ok_or_else(|| anyhow!("missing first rib_ts"))? + )?, + Self::format_rib_ts_for_filename( + *rib_ts + .last() + .ok_or_else(|| anyhow!("missing last rib_ts"))? + )?, + ) + }; + + let slug = self.filter_slug(&args.filters)?; + if slug.is_empty() { + Ok(base) + } else { + Ok(format!("{}-{}", base, slug)) + } + } + + fn resolve_country_asns( + &self, + country: Option<&str>, + no_update: bool, + ) -> Result>> { + let Some(country) = country else { + return Ok(None); + }; + + let country_code = self.resolve_country_code(country)?; + let asinfo = self.db.asinfo(); + + if asinfo.is_empty() { + if no_update { + return Err(anyhow!( + "ASInfo data is empty but --country was requested. Re-run without --no-update or refresh ASInfo first." + )); + } + self.db + .refresh_asinfo() + .map_err(|e| anyhow!("Failed to refresh ASInfo data for country filter: {}", e))?; + } else if !no_update && asinfo.needs_refresh(self.config.asinfo_cache_ttl()) { + self.db.refresh_asinfo().map_err(|e| { + anyhow!( + "Failed to refresh stale ASInfo data for country filter: {}", + e + ) + })?; + } + + let mut asns = HashSet::new(); + let mut stmt = self + .db + .connection() + .prepare("SELECT asn FROM asinfo_core WHERE UPPER(country) = UPPER(?1) ORDER BY asn") + .map_err(|e| anyhow!("Failed to prepare ASInfo country lookup: {}", e))?; + let rows = stmt + .query_map([country_code.clone()], |row| row.get::<_, u32>(0)) + .map_err(|e| { + anyhow!( + "Failed to query ASInfo by country '{}': {}", + country_code, + e + ) + })?; + + for row in rows { + asns.insert(row.map_err(|e| anyhow!("Failed to decode ASInfo country row: {}", e))?); + } + + Ok(Some(asns)) + } + + fn resolve_country_code(&self, input: &str) -> Result { + let lens = CountryLens::new(); + let matches = lens.lookup(input); + + if matches.is_empty() { + if input.len() == 2 { + return Ok(input.to_uppercase()); + } + return Err(anyhow!("Unknown country filter '{}'", input)); + } + + let exact_name_matches: Vec<_> = matches + .iter() + .filter(|entry| entry.name.eq_ignore_ascii_case(input)) + .collect(); + if exact_name_matches.len() == 1 { + return Ok(exact_name_matches[0].code.clone()); + } + + let exact_code_matches: Vec<_> = matches + .iter() + .filter(|entry| entry.code.eq_ignore_ascii_case(input)) + .collect(); + if exact_code_matches.len() == 1 { + return Ok(exact_code_matches[0].code.clone()); + } + + if matches.len() == 1 { + return Ok(matches[0].code.clone()); + } + + Err(anyhow!( + "Country filter '{}' is ambiguous; matches: {}", + input, + matches + .iter() + .map(|entry| format!("{} ({})", entry.name, entry.code)) + .collect::>() + .join(", ") + )) + } + + fn parse_origin_filter(values: &[String]) -> Result> { + if values.is_empty() { + return Ok(None); + } + + let negated = values + .first() + .map(|value| value.starts_with('!')) + .unwrap_or(false); + let mut parsed = HashSet::new(); + + for value in values { + let asn = value + .trim_start_matches('!') + .parse::() + .map_err(|e| anyhow!("Invalid origin ASN filter '{}': {}", value, e))?; + parsed.insert(asn); + } + + Ok(Some(OriginFilter { + values: parsed, + negated, + })) + } + + fn compile_as_path_regex(pattern: Option<&str>) -> Result> { + pattern + .map(|pattern| { + Regex::new(pattern) + .map_err(|e| anyhow!("Invalid --as-path regex '{}': {}", pattern, e)) + }) + .transpose() + } + + fn resolve_replay_groups( + &self, + args: &RibArgs, + normalized_ts: &[i64], + ) -> Result> { + let first_ts = *normalized_ts + .first() + .ok_or_else(|| anyhow!("Missing earliest rib_ts after validation"))?; + let last_ts = *normalized_ts + .last() + .ok_or_else(|| anyhow!("Missing latest rib_ts after validation"))?; + + let ribs = self + .base_broker(args) + .data_type("rib") + .ts_start(Self::timestamp_to_broker_string( + first_ts - Duration::hours(RIB_LOOKBACK_HOURS).num_seconds(), + )?) + .ts_end(Self::timestamp_to_broker_string(last_ts)?) + .query() + .map_err(|e| anyhow!("Failed to query broker for candidate RIB files: {}", e))?; + + let mut ribs_by_collector: BTreeMap> = BTreeMap::new(); + for item in ribs { + ribs_by_collector + .entry(item.collector_id.clone()) + .or_default() + .push(item); + } + + let mut groups = Vec::new(); + for (collector, mut collector_ribs) in ribs_by_collector { + collector_ribs.sort_by_key(|item| item.ts_start); + + let mut timestamps_by_rib: BTreeMap)> = BTreeMap::new(); + for rib_ts in normalized_ts { + let selected_rib = collector_ribs + .iter() + .filter(|item| item.ts_start.and_utc().timestamp() <= *rib_ts) + .max_by_key(|item| item.ts_start); + + let Some(selected_rib) = selected_rib else { + return Err(anyhow!( + "No RIB file found at or before {} for collector {}", + Self::format_rib_ts_for_error(*rib_ts)?, + collector + )); + }; + + timestamps_by_rib + .entry(selected_rib.url.clone()) + .and_modify(|(_, timestamps)| timestamps.push(*rib_ts)) + .or_insert_with(|| (selected_rib.clone(), vec![*rib_ts])); + } + + for (_, (rib_item, mut group_ts)) in timestamps_by_rib { + group_ts.sort_unstable(); + let group_max_ts = *group_ts + .last() + .ok_or_else(|| anyhow!("Replay group was created without any rib_ts"))?; + let updates = + self.resolve_group_updates(args, &collector, &rib_item, group_max_ts)?; + + groups.push(RibReplayGroup { + collector: collector.clone(), + rib_item, + rib_ts: group_ts, + updates, + }); + } + } + + groups.sort_by(|a, b| { + a.collector + .cmp(&b.collector) + .then(a.rib_item.ts_start.cmp(&b.rib_item.ts_start)) + }); + + if groups.is_empty() { + return Err(anyhow!( + "No suitable RIB files were found for the requested timestamps and collector filters." + )); + } + + Ok(groups) + } + + fn resolve_group_updates( + &self, + args: &RibArgs, + collector: &str, + rib_item: &BrokerItem, + group_max_ts: i64, + ) -> Result> { + let rib_ts = rib_item.ts_start.and_utc().timestamp(); + + let mut broker = self + .base_broker(args) + .collector_id(collector) + .data_type("updates") + .ts_start(Self::timestamp_to_broker_string(rib_ts)?) + .ts_end(Self::timestamp_to_broker_string(group_max_ts)?); + + if let Some(project) = &args.filters.project { + broker = broker.project(project); + } + + let mut updates = broker.query().map_err(|e| { + anyhow!( + "Failed to query broker for updates for {}: {}", + collector, + e + ) + })?; + + // Only keep update files that contain data up to and including the target timestamp. + // An update file with ts_end <= group_max_ts has all elements with timestamp <= group_max_ts. + updates.retain(|item| { + let item_end = item.ts_end.and_utc().timestamp(); + item_end > rib_ts && item_end <= group_max_ts + }); + updates.sort_by_key(|item| item.ts_start); + Ok(updates) + } + + fn build_full_feed_allowlists(&self, groups: &[RibReplayGroup]) -> Result { + let mut allowlists = HashMap::new(); + + for collector in groups + .iter() + .map(|group| group.collector.as_str()) + .collect::>() + { + let peers = BgpkitBroker::new() + .collector_id(collector) + .get_peers() + .map_err(|e| { + anyhow!( + "Failed to fetch broker peer metadata for {}: {}", + collector, + e + ) + })?; + + let allowed = peers + .into_iter() + .filter(|peer| { + peer.num_v4_pfxs >= FULL_FEED_V4_THRESHOLD + || peer.num_v6_pfxs >= FULL_FEED_V6_THRESHOLD + }) + .map(|peer| (peer.ip.to_string(), peer.asn)) + .collect::>(); + + allowlists.insert(collector.to_string(), allowed); + } + + Ok(allowlists) + } + + #[allow(clippy::too_many_arguments)] + fn load_base_rib( + &self, + state_store: &mut RibStateStore, + collector: &str, + rib_item: &BrokerItem, + safe_filters: &ParseFilters, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> Result<()> { + let parser = safe_filters.to_parser(&rib_item.url).map_err(|e| { + anyhow!( + "Failed to build parser for base RIB {}: {}", + rib_item.url, + e + ) + })?; + + let collector_arc = Arc::from(collector); + let mut batch = Vec::new(); + for elem in parser { + if elem.elem_type != ElemType::ANNOUNCE { + continue; + } + if self.announce_matches( + collector, + &elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + ) { + batch.push(StoredRibEntry::from_elem(Arc::clone(&collector_arc), elem)); + } + } + + state_store.upsert_entries(batch)?; + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + fn replay_updates( + &self, + state_store: &mut RibStateStore, + group: &RibReplayGroup, + args: &RibArgs, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + snapshot_visitor: &mut F, + ) -> Result<()> + where + F: FnMut(i64, &RibStateStore, &[StoredRibUpdate]) -> Result<()>, + { + let mut pending = HashMap::::new(); + let mut next_snapshot_index = 0usize; + let collector_arc = Arc::from(group.collector.as_str()); + + // Track filtered updates for the current snapshot interval + // These are updates that matched filters and affected the RIB state + let mut filtered_updates: Vec = Vec::new(); + + for update in &group.updates { + let safe_filters = self.safe_parse_filters( + args, + group.rib_item.ts_start.and_utc().timestamp(), + *group + .rib_ts + .last() + .ok_or_else(|| anyhow!("Replay group missing max rib_ts"))?, + ); + let parser = safe_filters.to_parser(&update.url).map_err(|e| { + anyhow!( + "Failed to build parser for updates file {}: {}", + update.url, + e + ) + })?; + + for elem in parser { + while next_snapshot_index < group.rib_ts.len() + && elem.timestamp > group.rib_ts[next_snapshot_index] as f64 + { + self.flush_pending(state_store, &mut pending)?; + // For the first RIB (index 0), pass empty updates + // For subsequent RIBs, pass the collected filtered updates + snapshot_visitor( + group.rib_ts[next_snapshot_index], + state_store, + &filtered_updates, + )?; + // Clear updates after emitting snapshot (they belong to this snapshot) + filtered_updates.clear(); + next_snapshot_index += 1; + } + + // Apply update and track if it was filtered/matched + let was_applied = self.apply_update_to_delta( + &mut pending, + state_store, + Arc::clone(&collector_arc), + &elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + )?; + + // If the update was applied (matched filters), track it for the updates table + if was_applied { + let elem_type = elem.elem_type; + let update_record = StoredRibUpdate::from_elem( + group.rib_ts[next_snapshot_index.min(group.rib_ts.len() - 1)], + Arc::clone(&collector_arc), + elem, + elem_type, + ); + filtered_updates.push(update_record); + } + } + } + + while next_snapshot_index < group.rib_ts.len() { + self.flush_pending(state_store, &mut pending)?; + snapshot_visitor( + group.rib_ts[next_snapshot_index], + state_store, + &filtered_updates, + )?; + filtered_updates.clear(); + next_snapshot_index += 1; + } + + Ok(()) + } + + /// Apply an update to the pending delta and return whether it matched filters. + /// + /// Returns `true` if the update matched filters and was recorded in the delta, + /// `false` if it was filtered out (doesn't mean it won't affect state - withdraws + /// always check for existing routes). + #[allow(clippy::too_many_arguments)] + fn apply_update_to_delta( + &self, + pending: &mut HashMap, + state_store: &RibStateStore, + collector: Arc, + elem: &BgpElem, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> Result { + let route_key = RibRouteKey::from_elem(Arc::clone(&collector), elem); + + match elem.elem_type { + ElemType::WITHDRAW => { + if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { + pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + Ok(true) + } else { + Ok(false) + } + } + ElemType::ANNOUNCE => { + let matches = self.announce_matches( + &collector, + elem, + country_asns, + origin_filter, + as_path_regex, + full_feed_allowlist, + ); + + if matches { + pending.insert( + route_key, + DeltaOp::Upsert(StoredRibEntry::from_elem(collector, elem.clone())), + ); + Ok(true) + } else if self.route_exists_in_state_or_delta(&route_key, state_store, pending)? { + pending.insert(route_key.clone(), DeltaOp::Delete(route_key)); + Ok(true) + } else { + Ok(false) + } + } + } + } + + fn route_exists_in_state_or_delta( + &self, + route_key: &RibRouteKey, + state_store: &RibStateStore, + pending: &HashMap, + ) -> Result { + if let Some(delta) = pending.get(route_key) { + return Ok(matches!(delta, DeltaOp::Upsert(_))); + } + state_store.route_exists(route_key) + } + + fn flush_pending( + &self, + state_store: &mut RibStateStore, + pending: &mut HashMap, + ) -> Result<()> { + if pending.is_empty() { + return Ok(()); + } + + let mut upserts = Vec::new(); + let mut deletes = Vec::new(); + + for delta in pending.values() { + match delta { + DeltaOp::Upsert(entry) => upserts.push(entry.clone()), + DeltaOp::Delete(key) => deletes.push(key.clone()), + } + } + + if !upserts.is_empty() { + state_store.upsert_entries(upserts)?; + } + if !deletes.is_empty() { + state_store.delete_keys(deletes)?; + } + + pending.clear(); + Ok(()) + } + + fn announce_matches( + &self, + collector: &str, + elem: &BgpElem, + country_asns: Option<&HashSet>, + origin_filter: Option<&OriginFilter>, + as_path_regex: Option<&Regex>, + full_feed_allowlist: Option<&HashSet<(String, u32)>>, + ) -> bool { + if collector.is_empty() { + return false; + } + + if let Some(origin_filter) = origin_filter { + let matches_origin = elem + .origin_asns + .as_ref() + .map(|origins| { + origins + .iter() + .any(|asn| origin_filter.values.contains(&asn.to_u32())) + }) + .unwrap_or(false); + + if origin_filter.negated { + if matches_origin { + return false; + } + } else if !matches_origin { + return false; + } + } + + if let Some(country_asns) = country_asns { + let matches_country = elem + .origin_asns + .as_ref() + .map(|origins| { + origins + .iter() + .any(|asn| country_asns.contains(&asn.to_u32())) + }) + .unwrap_or(false); + if !matches_country { + return false; + } + } + + if let Some(as_path_regex) = as_path_regex { + let as_path = elem + .as_path + .as_ref() + .map(|path| path.to_string()) + .unwrap_or_default(); + if !as_path_regex.is_match(&as_path) { + return false; + } + } + + if let Some(full_feed_allowlist) = full_feed_allowlist { + let peer_key = (elem.peer_ip.to_string(), elem.peer_asn.to_u32()); + if !full_feed_allowlist.contains(&peer_key) { + return false; + } + } + + true + } + + fn safe_parse_filters(&self, args: &RibArgs, start_ts: i64, end_ts: i64) -> ParseFilters { + ParseFilters { + prefix: args.filters.prefix.clone(), + include_super: args.filters.include_super, + include_sub: args.filters.include_sub, + peer_asn: args.filters.peer_asn.clone(), + start_ts: Some(start_ts.to_string()), + end_ts: Some(end_ts.to_string()), + ..Default::default() + } + } + + fn base_broker(&self, args: &RibArgs) -> BgpkitBroker { + let mut broker = BgpkitBroker::new().page_size(1000); + if let Some(collector) = &args.filters.collector { + broker = broker.collector_id(collector); + } + if let Some(project) = &args.filters.project { + broker = broker.project(project); + } + broker + } + + fn timestamp_to_broker_string(ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for broker query", ts))?; + Ok(timestamp.format("%Y-%m-%dT%H:%M:%SZ").to_string()) + } + + fn format_rib_ts_for_filename(rib_ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(rib_ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for file naming", rib_ts))?; + Ok(timestamp.format("%Y%m%dT%H%M%SZ").to_string()) + } + + fn format_rib_ts_for_error(rib_ts: i64) -> Result { + let timestamp = DateTime::from_timestamp(rib_ts, 0) + .ok_or_else(|| anyhow!("Invalid Unix timestamp {} for error reporting", rib_ts))?; + Ok(timestamp.format("%Y-%m-%dT%H:%M:%SZ").to_string()) + } + + fn filter_slug(&self, filters: &RibFilters) -> Result { + let mut parts = Vec::new(); + + if let Some(country) = &filters.country { + parts.push(format!( + "country-{}", + Self::sanitize_slug_component(country) + )); + } + if !filters.origin_asn.is_empty() { + parts.push(format!( + "origin-{}", + Self::sanitize_list_component(&filters.origin_asn) + )); + } + if !filters.peer_asn.is_empty() { + parts.push(format!( + "peer-{}", + Self::sanitize_list_component(&filters.peer_asn) + )); + } + if let Some(collector) = &filters.collector { + let values = collector + .split(',') + .map(|value| value.trim().to_string()) + .collect::>(); + parts.push(format!( + "collector-{}", + Self::sanitize_list_component(&values) + )); + } + if let Some(project) = &filters.project { + parts.push(format!( + "project-{}", + Self::sanitize_slug_component(project) + )); + } + if !filters.prefix.is_empty() { + parts.push(format!("prefix-{}", Self::hash8(&filters.prefix.join(",")))); + } + if let Some(as_path) = &filters.as_path { + parts.push(format!("aspath-{}", Self::hash8(as_path))); + } + if filters.full_feed_only { + parts.push("fullfeed".to_string()); + } + + let slug = parts.join("-"); + if slug.len() <= 96 { + return Ok(slug); + } + + let truncated = slug + .chars() + .take(80) + .collect::() + .trim_end_matches('-') + .to_string(); + Ok(format!("{}-h{}", truncated, Self::hash8(&slug))) + } + + fn sanitize_list_component(values: &[String]) -> String { + let mut normalized = values + .iter() + .map(|value| Self::sanitize_slug_component(value)) + .collect::>(); + normalized.sort(); + normalized.join("+") + } + + fn sanitize_slug_component(input: &str) -> String { + input + .to_ascii_lowercase() + .chars() + .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '_' }) + .collect::() + .trim_matches('_') + .to_string() + } + + fn hash8(input: &str) -> String { + let mut hash = 0xcbf29ce484222325_u64; + for byte in input.as_bytes() { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(0x100000001b3); + } + format!("{:08x}", hash & 0xffff_ffff) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn base_args() -> RibArgs { + RibArgs { + rib_ts: vec!["2025-09-01T12:00:00Z".to_string()], + filters: RibFilters { + ..Default::default() + }, + sqlite_path: None, + } + } + + #[test] + fn test_validate_multi_ts_stdout_error() { + let mut args = base_args(); + args.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + assert!(args.validate().is_err()); + } + + #[test] + fn test_validate_multi_ts_file_output_ok() -> Result<()> { + let mut args = base_args(); + args.rib_ts.push("2025-09-01T13:00:00Z".to_string()); + args.sqlite_path = Some(PathBuf::from("/tmp/monocle-rib.sqlite3")); + let values = args.validate()?; + assert_eq!(values.len(), 2); + Ok(()) + } + + #[test] + fn test_filter_slug_order() -> Result<()> { + let mut args = base_args(); + args.filters.country = Some("IR".to_string()); + args.filters.origin_asn = vec!["15169".to_string(), "13335".to_string()]; + args.filters.peer_asn = vec!["2914".to_string()]; + args.filters.collector = Some("rrc00,route-views2".to_string()); + args.filters.project = Some("riperis".to_string()); + args.filters.prefix = vec!["1.1.1.0/24".to_string()]; + args.filters.as_path = Some("^15169 ".to_string()); + args.filters.full_feed_only = true; + + let db = MonocleDatabase::open_in_memory()?; + let config = MonocleConfig::default(); + let lens = RibLens::new(&db, &config); + let slug = lens.filter_slug(&args.filters)?; + + assert!(slug + .starts_with("country-ir-origin-13335+15169-peer-2914-collector-route_views2+rrc00")); + assert!(slug.contains("-h")); + Ok(()) + } + + #[test] + fn test_hash8_is_stable() { + assert_eq!(RibLens::hash8("a"), RibLens::hash8("a")); + } + + #[test] + fn test_file_name_prefix_includes_filters() -> Result<()> { + let mut args = base_args(); + args.filters.country = Some("US".to_string()); + args.filters.origin_asn = vec!["13335".to_string()]; + args.filters.full_feed_only = true; + + let db = MonocleDatabase::open_in_memory()?; + let config = MonocleConfig::default(); + let lens = RibLens::new(&db, &config); + let file_name = format!( + "{}.sqlite3", + lens.file_name_prefix(&args, &[1_756_728_000])? + ); + + assert_eq!( + file_name, + "monocle-rib-20250901T120000Z-country-us-origin-13335-fullfeed.sqlite3" + ); + Ok(()) + } +}