diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml new file mode 100644 index 0000000..bf8a101 --- /dev/null +++ b/.github/workflows/benchmark.yml @@ -0,0 +1,166 @@ +name: Benchmarks + +on: + push: + branches: ['**'] + pull_request: + branches: ['**'] + +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: "-C target-cpu=native" + DATASET_BASE_URL: "https://static.wilsonl.in/embedding-datasets" + +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install Rust nightly + uses: dtolnay/rust-toolchain@nightly + + - name: Install dependencies + run: sudo apt-get update && sudo apt-get install -y clang + + - name: Cache cargo registry + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-bench-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-bench- + + - name: Build benchmark binary + run: cargo build --release -p ci + + - name: Upload binary + uses: actions/upload-artifact@v4 + with: + name: ci-binary + path: target/release/ci + retention-days: 1 + + random: + name: Random Vectors + needs: build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Download binary + uses: actions/download-artifact@v4 + with: + name: ci-binary + path: . + + - name: Run benchmarks + run: | + chmod +x ci + mkdir -p benchmark-results + ./ci --output benchmark-results/random.json + + - name: Upload results + uses: actions/upload-artifact@v4 + with: + name: results-random + path: benchmark-results/ + retention-days: 90 + + dataset: + name: ${{ matrix.dataset }} + needs: build + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - dataset: siftsmall + files: "info.toml vectors.bin queries.bin results.bin" + - dataset: sift-250k + files: "info.toml vectors.bin queries.bin results.bin" + - dataset: sift + files: "info.toml vectors.bin queries.bin results.bin" + - dataset: gist-250k + files: "info.toml vectors.bin queries.bin results.bin" + - dataset: gist + files: "info.toml vectors.bin queries.bin results.bin" + - dataset: bbcnews-nomicembed15 + files: "info.toml vectors.bin" + - dataset: bbcnews-static256 + files: "info.toml vectors.bin" + - dataset: steam-games + files: "info.toml vectors.bin" + - dataset: gdelt-us-news + files: "info.toml vectors.bin" + steps: + - uses: actions/checkout@v4 + + - name: Install aria2 + run: sudo apt-get update && sudo apt-get install -y aria2 + + - name: Download binary + uses: actions/download-artifact@v4 + with: + name: ci-binary + path: . + + - name: Cache dataset + uses: actions/cache@v4 + with: + path: ci/datasets/${{ matrix.dataset }} + key: dataset-${{ matrix.dataset }}-v1 + + - name: Download dataset + run: | + mkdir -p ci/datasets/${{ matrix.dataset }} + cd ci/datasets/${{ matrix.dataset }} + for f in ${{ matrix.files }}; do + [ -f "$f" ] || aria2c -x16 -s16 "$DATASET_BASE_URL/${{ matrix.dataset }}/$f" + done + + - name: Run benchmarks + run: | + chmod +x ci + mkdir -p benchmark-results + ./ci --output benchmark-results/${{ matrix.dataset }}.json + + - name: Upload results + uses: actions/upload-artifact@v4 + with: + name: results-${{ matrix.dataset }} + path: benchmark-results/ + retention-days: 90 + + summary: + name: Summary + needs: [random, dataset] + if: always() + runs-on: ubuntu-latest + steps: + - name: Download all results + uses: actions/download-artifact@v4 + with: + pattern: results-* + path: all-results/ + merge-multiple: true + + - name: Generate summary + run: | + echo "# CoreNN Benchmark Summary" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + echo "**Commit:** \`${{ github.sha }}\`" >> $GITHUB_STEP_SUMMARY + echo "**Branch:** \`${{ github.ref_name }}\`" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + for f in all-results/*.json 2>/dev/null; do + [ -f "$f" ] || continue + echo "### $(basename $f .json)" >> $GITHUB_STEP_SUMMARY + echo '```json' >> $GITHUB_STEP_SUMMARY + cat "$f" >> $GITHUB_STEP_SUMMARY + echo '```' >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + done diff --git a/Cargo.toml b/Cargo.toml index e81128d..67be94e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "corenn", "corenn-py", "corenn-node", + "ci", ] [profile.release] diff --git a/ci/Cargo.toml b/ci/Cargo.toml new file mode 100644 index 0000000..d2805bd --- /dev/null +++ b/ci/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ci" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +libcorenn = { path = "../libcorenn" } +chrono = { version = "0.4", default-features = false, features = ["std", "clock"] } +half = "2.4" +rand = "0.8" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +toml = "0.8" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["json"] } diff --git a/ci/src/main.rs b/ci/src/main.rs new file mode 100644 index 0000000..4fabf5c --- /dev/null +++ b/ci/src/main.rs @@ -0,0 +1,608 @@ +//! CI Benchmark Binary + +use libcorenn::cfg::Cfg; +use libcorenn::cfg::CompressionMode; +use libcorenn::metric::StdMetric; +use libcorenn::CoreNN; +use rand::rngs::StdRng; +use rand::Rng; +use rand::SeedableRng; +use serde::Deserialize; +use serde::Serialize; +use std::fs::File; +use std::io::BufReader; +use std::io::Read; +use std::io::Write; +use std::path::Path; +use std::path::PathBuf; +use std::time::Instant; +use tracing::info; + +const DATASETS_DIR: &str = env!("CARGO_MANIFEST_DIR"); + +#[derive(Debug, Deserialize)] +struct DatasetInfo { + dtype: String, + metric: String, + dim: usize, + n: usize, + #[serde(default)] + q: usize, + #[serde(default)] + k: usize, +} + +#[derive(Debug, Serialize)] +struct BenchmarkResult { + name: String, + dataset: String, + dimension: usize, + num_vectors: usize, + num_queries: usize, + k: usize, + metric: String, + insert_throughput_vps: f64, + insert_total_ms: f64, + #[serde(skip_serializing_if = "Option::is_none")] + query_throughput_qps: Option, + #[serde(skip_serializing_if = "Option::is_none")] + query_latency_mean_us: Option, + #[serde(skip_serializing_if = "Option::is_none")] + query_latency_p50_us: Option, + #[serde(skip_serializing_if = "Option::is_none")] + query_latency_p95_us: Option, + #[serde(skip_serializing_if = "Option::is_none")] + query_latency_p99_us: Option, + #[serde(skip_serializing_if = "Option::is_none")] + recall_at_k: Option, + config: BenchmarkConfig, +} + +#[derive(Debug, Serialize)] +struct BenchmarkConfig { + beam_width: usize, + max_edges: usize, + query_search_list_cap: usize, + #[serde(skip_serializing_if = "Option::is_none")] + compression: Option, +} + +#[derive(Debug, Serialize)] +struct BenchmarkReport { + commit: String, + timestamp: String, + results: Vec, +} + +fn random_f32_vec(rng: &mut StdRng, dim: usize) -> Vec { + (0..dim).map(|_| rng.gen::()).collect() +} + +fn load_f32_vectors(path: &Path, dim: usize, count: usize) -> Vec> { + let file = + File::open(path).unwrap_or_else(|e| panic!("Failed to open {}: {}", path.display(), e)); + let mut reader = BufReader::new(file); + let mut buffer = vec![0u8; dim * 4]; + let mut vectors = Vec::with_capacity(count); + + for _ in 0..count { + if reader.read_exact(&mut buffer).is_err() { + break; + } + let vec: Vec = buffer + .chunks(4) + .map(|b| f32::from_le_bytes([b[0], b[1], b[2], b[3]])) + .collect(); + vectors.push(vec); + } + + vectors +} + +fn load_f16_vectors(path: &Path, dim: usize, count: usize) -> Vec> { + let file = + File::open(path).unwrap_or_else(|e| panic!("Failed to open {}: {}", path.display(), e)); + let mut reader = BufReader::new(file); + let mut buffer = vec![0u8; dim * 2]; + let mut vectors = Vec::with_capacity(count); + + for _ in 0..count { + if reader.read_exact(&mut buffer).is_err() { + break; + } + let vec: Vec = buffer + .chunks(2) + .map(|b| { + let bits = u16::from_le_bytes([b[0], b[1]]); + half::f16::from_bits(bits).to_f32() + }) + .collect(); + vectors.push(vec); + } + + vectors +} + +fn load_groundtruth(path: &Path, q: usize, k: usize) -> Vec> { + let file = + File::open(path).unwrap_or_else(|e| panic!("Failed to open {}: {}", path.display(), e)); + let mut reader = BufReader::new(file); + let mut buffer = vec![0u8; k * 4]; + let mut results = Vec::with_capacity(q); + + for _ in 0..q { + if reader.read_exact(&mut buffer).is_err() { + break; + } + let ids: Vec = buffer + .chunks(4) + .map(|b| u32::from_le_bytes([b[0], b[1], b[2], b[3]])) + .collect(); + results.push(ids); + } + + results +} + +fn percentile(sorted: &[f64], p: f64) -> f64 { + if sorted.is_empty() { + return 0.0; + } + let idx = ((sorted.len() as f64) * p / 100.0).floor() as usize; + sorted[idx.min(sorted.len() - 1)] +} + +fn benchmark_insert(db: &CoreNN, vectors: &[Vec]) -> (f64, f64) { + let start = Instant::now(); + for (i, vec) in vectors.iter().enumerate() { + db.insert(&format!("vec_{}", i), vec); + } + let elapsed = start.elapsed(); + let throughput = vectors.len() as f64 / elapsed.as_secs_f64(); + (throughput, elapsed.as_secs_f64() * 1000.0) +} + +fn benchmark_queries( + db: &CoreNN, + queries: &[Vec], + k: usize, +) -> (f64, f64, f64, f64, f64, Vec>) { + let mut latencies = Vec::with_capacity(queries.len()); + let mut all_results = Vec::with_capacity(queries.len()); + + for query in queries { + let start = Instant::now(); + let results = db.query(query, k); + let elapsed = start.elapsed(); + latencies.push(elapsed.as_secs_f64() * 1_000_000.0); + all_results.push(results.into_iter().map(|(key, _dist)| key).collect()); + } + + latencies.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let mean = latencies.iter().sum::() / latencies.len() as f64; + let p50 = percentile(&latencies, 50.0); + let p95 = percentile(&latencies, 95.0); + let p99 = percentile(&latencies, 99.0); + let total_time: f64 = latencies.iter().sum(); + let qps = queries.len() as f64 / (total_time / 1_000_000.0); + + (qps, mean, p50, p95, p99, all_results) +} + +fn compute_recall(results: &[Vec], groundtruth: &[Vec], k: usize) -> f64 { + let mut total_correct = 0; + let mut total = 0; + + for (result, gt) in results.iter().zip(groundtruth.iter()) { + let gt_set: std::collections::HashSet = gt.iter().take(k).copied().collect(); + for key in result.iter().take(k) { + if let Some(id_str) = key.strip_prefix("vec_") { + if let Ok(id) = id_str.parse::() { + if gt_set.contains(&id) { + total_correct += 1; + } + } + } + } + total += k.min(gt.len()); + } + + if total == 0 { + 0.0 + } else { + total_correct as f64 / total as f64 + } +} + +fn parse_metric(s: &str) -> StdMetric { + match s.to_lowercase().as_str() { + "l2" | "euclidean" => StdMetric::L2, + "cosine" => StdMetric::Cosine, + _ => StdMetric::L2, + } +} + +fn run_random_benchmarks() -> Vec { + let mut results = Vec::new(); + let mut rng = StdRng::seed_from_u64(42); + + let base_configs = [ + (128, 1000, "random_128d_1k"), + (128, 10000, "random_128d_10k"), + (128, 50000, "random_128d_50k"), + (384, 10000, "random_384d_10k"), + (768, 10000, "random_768d_10k"), + (1536, 5000, "random_1536d_5k"), + ]; + + let k = 10; + let num_queries = 100; + + for (dim, num_vectors, name) in base_configs { + info!(benchmark = name, "starting"); + + let vectors: Vec> = (0..num_vectors) + .map(|_| random_f32_vec(&mut rng, dim)) + .collect(); + let queries: Vec> = (0..num_queries) + .map(|_| random_f32_vec(&mut rng, dim)) + .collect(); + + let cfg = Cfg { + dim, + metric: StdMetric::L2, + beam_width: 4, + max_edges: 32, + query_search_list_cap: 128, + update_search_list_cap: 128, + compression_threshold: usize::MAX, + ..Default::default() + }; + let db = CoreNN::new_in_memory(cfg.clone()); + + let (insert_throughput, insert_total_ms) = benchmark_insert(&db, &vectors); + let (qps, mean, p50, p95, p99, _) = benchmark_queries(&db, &queries, k); + + info!( + benchmark = name, + insert_vps = insert_throughput, + query_qps = qps, + p50_us = p50, + "completed" + ); + + results.push(BenchmarkResult { + name: name.to_string(), + dataset: "random".to_string(), + dimension: dim, + num_vectors, + num_queries, + k, + metric: "L2".to_string(), + insert_throughput_vps: insert_throughput, + insert_total_ms, + query_throughput_qps: Some(qps), + query_latency_mean_us: Some(mean), + query_latency_p50_us: Some(p50), + query_latency_p95_us: Some(p95), + query_latency_p99_us: Some(p99), + recall_at_k: None, + config: BenchmarkConfig { + beam_width: cfg.beam_width, + max_edges: cfg.max_edges, + query_search_list_cap: cfg.query_search_list_cap, + compression: None, + }, + }); + } + + // Compression benchmarks on 768d/10k + let compression_configs: Vec<(CompressionMode, &str, usize)> = vec![ + (CompressionMode::PQ, "pq", 16), + (CompressionMode::SQ, "sq", 0), + ]; + + let dim = 768; + let num_vectors = 10000; + + let vectors: Vec> = (0..num_vectors) + .map(|_| random_f32_vec(&mut rng, dim)) + .collect(); + let queries: Vec> = (0..num_queries) + .map(|_| random_f32_vec(&mut rng, dim)) + .collect(); + + for (mode, mode_name, pq_subspaces) in compression_configs { + let name = format!("random_768d_10k_{}", mode_name); + info!(benchmark = %name, compression = mode_name, "starting"); + + let cfg = Cfg { + dim, + metric: StdMetric::L2, + beam_width: 4, + max_edges: 32, + query_search_list_cap: 128, + update_search_list_cap: 128, + compression_mode: mode, + compression_threshold: 0, + pq_subspaces: if pq_subspaces > 0 { pq_subspaces } else { 64 }, + ..Default::default() + }; + let db = CoreNN::new_in_memory(cfg.clone()); + + let (insert_throughput, insert_total_ms) = benchmark_insert(&db, &vectors); + let (qps, mean, p50, p95, p99, _) = benchmark_queries(&db, &queries, k); + + info!( + benchmark = %name, + insert_vps = insert_throughput, + query_qps = qps, + p50_us = p50, + "completed" + ); + + results.push(BenchmarkResult { + name: name.clone(), + dataset: "random".to_string(), + dimension: dim, + num_vectors, + num_queries, + k, + metric: "L2".to_string(), + insert_throughput_vps: insert_throughput, + insert_total_ms, + query_throughput_qps: Some(qps), + query_latency_mean_us: Some(mean), + query_latency_p50_us: Some(p50), + query_latency_p95_us: Some(p95), + query_latency_p99_us: Some(p99), + recall_at_k: None, + config: BenchmarkConfig { + beam_width: cfg.beam_width, + max_edges: cfg.max_edges, + query_search_list_cap: cfg.query_search_list_cap, + compression: Some(mode_name.to_string()), + }, + }); + } + + results +} + +fn run_dataset_benchmark(dataset_name: &str) -> Vec { + let mut results = Vec::new(); + let datasets_dir = PathBuf::from(DATASETS_DIR).join("datasets"); + let dir = datasets_dir.join(dataset_name); + + let info_path = dir.join("info.toml"); + let info_str = std::fs::read_to_string(&info_path) + .unwrap_or_else(|e| panic!("Failed to read {}: {}", info_path.display(), e)); + let info: DatasetInfo = toml::from_str(&info_str) + .unwrap_or_else(|e| panic!("Failed to parse {}: {}", info_path.display(), e)); + + info!( + dataset = dataset_name, + dtype = %info.dtype, + metric = %info.metric, + dim = info.dim, + n = info.n, + q = info.q, + k = info.k, + "loading dataset" + ); + + let vectors_path = dir.join("vectors.bin"); + let vectors = if info.dtype == "f16" { + load_f16_vectors(&vectors_path, info.dim, info.n) + } else { + load_f32_vectors(&vectors_path, info.dim, info.n) + }; + + info!(dataset = dataset_name, vectors = vectors.len(), "loaded vectors"); + + let metric = parse_metric(&info.metric); + let metric_str = info.metric.clone(); + + let has_groundtruth = info.q > 0 && info.k > 0; + + if has_groundtruth { + let queries_path = dir.join("queries.bin"); + let results_path = dir.join("results.bin"); + + let queries = if info.dtype == "f16" { + load_f16_vectors(&queries_path, info.dim, info.q) + } else { + load_f32_vectors(&queries_path, info.dim, info.q) + }; + let groundtruth = load_groundtruth(&results_path, info.q, info.k); + + info!( + dataset = dataset_name, + queries = queries.len(), + groundtruth = groundtruth.len(), + "loaded queries" + ); + + let k_values: Vec = vec![1, 10, info.k.min(100)]; + let search_caps = [128, 256]; + + for &k in &k_values { + for &search_cap in &search_caps { + let bench_name = format!("{}_k{}_cap{}", dataset_name, k, search_cap); + info!(benchmark = %bench_name, k = k, search_cap = search_cap, "starting"); + + let cfg = Cfg { + dim: info.dim, + metric, + beam_width: 4, + max_edges: 32, + query_search_list_cap: search_cap, + update_search_list_cap: search_cap, + compression_threshold: usize::MAX, + ..Default::default() + }; + let db = CoreNN::new_in_memory(cfg.clone()); + + let (insert_throughput, insert_total_ms) = benchmark_insert(&db, &vectors); + let (qps, mean, p50, p95, p99, query_results) = benchmark_queries(&db, &queries, k); + let recall = compute_recall(&query_results, &groundtruth, k); + + info!( + benchmark = %bench_name, + insert_vps = insert_throughput, + query_qps = qps, + recall = recall, + p50_us = p50, + "completed" + ); + + results.push(BenchmarkResult { + name: bench_name.clone(), + dataset: dataset_name.to_string(), + dimension: info.dim, + num_vectors: vectors.len(), + num_queries: queries.len(), + k, + metric: metric_str.clone(), + insert_throughput_vps: insert_throughput, + insert_total_ms, + query_throughput_qps: Some(qps), + query_latency_mean_us: Some(mean), + query_latency_p50_us: Some(p50), + query_latency_p95_us: Some(p95), + query_latency_p99_us: Some(p99), + recall_at_k: Some(recall), + config: BenchmarkConfig { + beam_width: cfg.beam_width, + max_edges: cfg.max_edges, + query_search_list_cap: cfg.query_search_list_cap, + compression: None, + }, + }); + } + } + } else { + let bench_name = format!("{}_insert_only", dataset_name); + info!(benchmark = %bench_name, "starting insert-only"); + + let cfg = Cfg { + dim: info.dim, + metric, + beam_width: 4, + max_edges: 32, + query_search_list_cap: 128, + update_search_list_cap: 128, + compression_threshold: usize::MAX, + ..Default::default() + }; + let db = CoreNN::new_in_memory(cfg.clone()); + + let (insert_throughput, insert_total_ms) = benchmark_insert(&db, &vectors); + + info!( + benchmark = %bench_name, + insert_vps = insert_throughput, + insert_ms = insert_total_ms, + "completed" + ); + + results.push(BenchmarkResult { + name: bench_name, + dataset: dataset_name.to_string(), + dimension: info.dim, + num_vectors: vectors.len(), + num_queries: 0, + k: 0, + metric: metric_str, + insert_throughput_vps: insert_throughput, + insert_total_ms, + query_throughput_qps: None, + query_latency_mean_us: None, + query_latency_p50_us: None, + query_latency_p95_us: None, + query_latency_p99_us: None, + recall_at_k: None, + config: BenchmarkConfig { + beam_width: cfg.beam_width, + max_edges: cfg.max_edges, + query_search_list_cap: cfg.query_search_list_cap, + compression: None, + }, + }); + } + + results +} + +fn discover_datasets() -> Vec { + let datasets_dir = PathBuf::from(DATASETS_DIR).join("datasets"); + let mut datasets = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(&datasets_dir) { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() && path.join("info.toml").exists() && path.join("vectors.bin").exists() { + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + datasets.push(name.to_string()); + } + } + } + } + + datasets.sort(); + datasets +} + +fn main() { + tracing_subscriber::fmt().json().init(); + + let args: Vec = std::env::args().collect(); + + let mut output_path: Option = None; + + let mut i = 1; + while i < args.len() { + match args[i].as_str() { + "--output" => { + output_path = Some(PathBuf::from(&args[i + 1])); + i += 2; + } + _ => { + i += 1; + } + } + } + + let mut all_results = Vec::new(); + + info!("starting random benchmarks"); + all_results.extend(run_random_benchmarks()); + + let datasets = discover_datasets(); + if datasets.is_empty() { + info!("no datasets found"); + } else { + info!(count = datasets.len(), "starting dataset benchmarks"); + for dataset in datasets { + all_results.extend(run_dataset_benchmark(&dataset)); + } + } + + let report = BenchmarkReport { + commit: std::env::var("GITHUB_SHA").unwrap_or_else(|_| "unknown".to_string()), + timestamp: chrono::Utc::now().to_rfc3339(), + results: all_results, + }; + + let json = serde_json::to_string_pretty(&report).expect("Failed to serialize results"); + + if let Some(path) = output_path { + let mut file = File::create(&path).expect("Failed to create output file"); + file + .write_all(json.as_bytes()) + .expect("Failed to write output file"); + info!(path = %path.display(), "results written"); + } else { + println!("{}", json); + } +} diff --git a/corenn-node/src/lib.rs b/corenn-node/src/lib.rs index 8938961..911388a 100644 --- a/corenn-node/src/lib.rs +++ b/corenn-node/src/lib.rs @@ -1,12 +1,33 @@ -use libcorenn::{cfg::{Cfg, CompressionMode}, metric::StdMetric, CoreNN}; -use neon::{handle::Handle, object::Object, prelude::{Context, FunctionContext, ModuleContext}, result::NeonResult, types::{buffer::TypedArray, Finalize, JsArray, JsBox, JsNumber, JsObject, JsString, JsTypedArray, JsUndefined, Value}}; +use libcorenn::CoreNN; +use libcorenn::cfg::Cfg; +use libcorenn::cfg::CompressionMode; +use libcorenn::metric::StdMetric; +use neon::handle::Handle; +use neon::object::Object; +use neon::prelude::Context; +use neon::prelude::FunctionContext; +use neon::prelude::ModuleContext; +use neon::result::NeonResult; +use neon::types::Finalize; +use neon::types::JsArray; +use neon::types::JsBox; +use neon::types::JsNumber; +use neon::types::JsObject; +use neon::types::JsString; +use neon::types::JsTypedArray; +use neon::types::JsUndefined; +use neon::types::Value; +use neon::types::buffer::TypedArray; // Neon requires Finalize. struct CoreNNWrapper(CoreNN); impl Finalize for CoreNNWrapper {} -fn compression_mode_from_str(cx: &mut FunctionContext, s: Handle) -> NeonResult { +fn compression_mode_from_str( + cx: &mut FunctionContext, + s: Handle, +) -> NeonResult { let s = s.value(cx); match s.as_str() { "pq" => Ok(CompressionMode::PQ), @@ -35,16 +56,20 @@ fn as_usize(cx: &mut FunctionContext, v: Handle) -> NeonResult fn cfg_from_js(cx: &mut FunctionContext, cfg_js: &JsObject) -> NeonResult { let mut cfg = Cfg::default(); macro_rules! prop { - ($name:ident, $type:ty, $parser:expr) => { - let maybe = prop::<$type, _>(cx, &cfg_js, stringify!($name), $parser)?; - if let Some(v) = maybe { - cfg.$name = v; - } - }; + ($name:ident, $type:ty, $parser:expr) => { + let maybe = prop::<$type, _>(cx, &cfg_js, stringify!($name), $parser)?; + if let Some(v) = maybe { + cfg.$name = v; + } + }; } prop!(beam_width, JsNumber, |cx, v| as_usize(cx, v)); - prop!(compaction_threshold_deletes, JsNumber, |cx, v| as_usize(cx, v)); - prop!(compression_mode, JsString, |cx, v| compression_mode_from_str(cx, v)); + prop!(compaction_threshold_deletes, JsNumber, |cx, v| as_usize( + cx, v + )); + prop!(compression_mode, JsString, |cx, v| { + compression_mode_from_str(cx, v) + }); prop!(compression_threshold, JsNumber, |cx, v| as_usize(cx, v)); prop!(dim, JsNumber, |cx, v| as_usize(cx, v)); prop!(distance_threshold, JsNumber, |cx, v| Ok(v.value(cx))); @@ -59,7 +84,12 @@ fn cfg_from_js(cx: &mut FunctionContext, cfg_js: &JsObject) -> NeonResult { Ok(cfg) } -fn prop(cx: &mut FunctionContext, obj: &JsObject, key: &str, parser: impl FnOnce(&mut FunctionContext, Handle) -> NeonResult) -> NeonResult> { +fn prop( + cx: &mut FunctionContext, + obj: &JsObject, + key: &str, + parser: impl FnOnce(&mut FunctionContext, Handle) -> NeonResult, +) -> NeonResult> { let Some(prop) = obj.get_opt::(cx, key)? else { return Ok(None); }; diff --git a/docs/INTERNAL_ENGINEERING.md b/docs/INTERNAL_ENGINEERING.md new file mode 100644 index 0000000..c0005ad --- /dev/null +++ b/docs/INTERNAL_ENGINEERING.md @@ -0,0 +1,412 @@ +# CoreNN Internal Engineering Reference + +Billion-scale vector database for ANN search. DiskANN/Vamana graph algorithm with RocksDB persistence, PQ/SQ compression, SIMD distance computation. + +## Architecture + +``` +libcorenn/src/ +├── lib.rs Core CoreNN struct, search/insert logic +├── cfg.rs Configuration (hyperparameters) +├── cache.rs In-memory node caching +├── compaction.rs Graph maintenance, delete handling +├── common.rs Common types (Id) +├── util.rs Atomic utilities +├── vec.rs VecData (bf16/f16/f32/f64) +├── metric/ +│ ├── mod.rs Metric trait +│ ├── l2.rs L2 distance (SIMD) +│ └── cosine.rs Cosine distance (SIMD) +├── compressor/ +│ ├── mod.rs Compressor trait +│ ├── pq.rs Product Quantization +│ ├── scalar.rs Scalar Quantization +│ └── trunc.rs Truncation (Matryoshka) +└── store/ + ├── mod.rs Store trait + ├── rocksdb.rs RocksDB backend + ├── in_memory.rs In-memory backend + └── schema.rs DB schema (NODE, ADD_EDGES, etc.) +``` + +## Core Data Structures + +### DbNodeData (store/schema.rs) +```rust +pub struct DbNodeData { + pub version: u64, // Incremented on update, used for cache invalidation + pub neighbors: Vec, // Graph edges + pub vector: Arc, // Co-located with neighbors (one disk page read) +} +``` + +### VecData (vec.rs) +```rust +pub enum VecData { + BF16(Vec), + F16(Vec), + F32(Vec), + F64(Vec), +} +``` + +### State (lib.rs) +```rust +pub struct State { + add_edges: DashMap>, // Pending backedges (lazy pruning) + cfg: Cfg, // Immutable after creation + db: Arc, // RocksDB or InMemory + deleted: DashSet, // Soft-deleted IDs + mode: RwLock, // Uncompressed or Compressed + count: AtomUsz, // Vector count + next_id: AtomUsz, // ID allocator + // ...caches, locks +} +``` + +### Mode (lib.rs) +```rust +enum Mode { + Uncompressed(NodeCache), // Lazy cache of DbNodeData in-memory + Compressed(Arc, CVCache), // PQ/SQ/Trunc compressed vectors +} +``` + +### Database Schema (store/schema.rs) +``` +ADD_EDGES: Id → Vec Pending edges for lazy updates +CFG: () → Cfg Configuration +DELETED: Id → () Soft-deleted IDs +KEY_TO_ID: String → Id String key to numeric ID +ID_TO_KEY: Id → String Numeric ID to string key +NODE: Id → DbNodeData Graph nodes with vectors and edges +PQ_MODEL: () → ProductQuantizer Product Quantization model +SQ_MODEL: () → ScalarQuantizer Scalar Quantization model +``` + +## Configuration (cfg.rs) + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `dim` | required | Vector dimensionality | +| `metric` | L2 | L2 or Cosine | +| `beam_width` | 4 | Nodes expanded per search iteration | +| `max_edges` | 64 | Max neighbors per node | +| `max_add_edges` | 128 | Pending edges before lazy pruning triggers | +| `distance_threshold` | 1.2 | α for Vamana RobustPrune (controls graph density) | +| `query_search_list_cap` | 128 | Search list size for queries | +| `update_search_list_cap` | 128 | Search list size for inserts | +| `compression_mode` | PQ | PQ, SQ, or Trunc | +| `compression_threshold` | 10M | Enable compression after N vectors | +| `pq_subspaces` | 64 | PQ subspace count | +| `pq_sample_size` | 10K | PQ training sample size | +| `trunc_dims` | 64 | Truncation dimensions (Matryoshka) | + +## Algorithms + +### Search (lib.rs) +Greedy beam search with HNSW-style early stopping: + +``` +1. Start from entry node (id=0, clone of first inserted vector) +2. Initialize lower_bound = entry.distance +3. Maintain search_list sorted by distance (max size: search_list_cap) +4. Loop: + a. Pop beam_width unexpanded nodes from search_list + b. Early stop: if best_unexpanded > lower_bound AND list is full: break + c. For each expanded node: + - Fetch neighbors from DB (NODE) + pending edges (add_edges) + - Add unseen neighbors to search_list (only if dist < lower_bound) + - Re-rank expanded node with full vector distance + d. Truncate search_list to search_list_cap + e. Update lower_bound = worst result distance +5. Return top-k from search_list +``` + +### Insert (lib.rs) +``` +1. Assign id = next_id++ +2. candidates = search(vector, k=1, update_search_list_cap) +3. neighbors = prune_candidates(vector, candidates) // Vamana RobustPrune +4. Save node (id, neighbors, vector) to DB +5. For each neighbor j: + - If j.add_edges.len >= max_add_edges: + j.neighbors = prune_candidates(j.vector, j.neighbors + j.add_edges) + Save j to DB + - Else: + j.add_edges.append(id) +``` + +### Vamana RobustPrune (lib.rs) +Algorithm 2 from DiskANN paper (Subramanya et al., NeurIPS 2019): + +``` +RobustPrune(p, V, α, R): + V ← (V ∪ Nout(p)) \ {p} // Merge with existing neighbors + Nout(p) ← ∅ + + while V ≠ ∅ do + p* ← argmin_{p' ∈ V} d(p, p') // Pick closest to node p + Nout(p) ← Nout(p) ∪ {p*} // Add to neighbors + if |Nout(p)| = R then break // Stop at max degree + + for p' ∈ V do + if α · d(p*, p') ≤ d(p, p') then // α-RNG condition + remove p' from V // Prune covered points +``` + +The α parameter (distance_threshold) is CRUCIAL: +- α = 1.0: Standard RNG, sparser graph, potentially larger diameter +- α > 1.0: Denser graph, **guarantees O(log n) diameter** for disk-based search +- α = 1.2: Recommended value (DiskANN paper) + +Each search step makes multiplicative progress: `d(query, next) ≤ d(query, current) / α` + +Complexity: O(R × |V|) where R = max_edges, |V| = candidates + +### Compaction (compaction.rs) +Handles deleted vectors. Iterates all nodes to remove edges to deleted nodes. Uses RocksDB snapshot for consistent iteration during concurrent updates. + +## Compression + +### Product Quantization (compressor/pq.rs) +Subspace decomposition: D dimensions → M subspaces × 256 centroids + +Training: Mini-Batch K-means via linfa-clustering on sampled vectors + +Encoding: Each subspace maps to 1-byte centroid index → M bytes per vector + +ADC (Asymmetric Distance Computation): +1. Query stays uncompressed +2. Precompute distance from query subvector to all 256 centroids per subspace +3. For each compressed vector: sum table lookups (O(M) vs O(D)) + +PQDistanceTable: +```rust +struct PQDistanceTable { + squared_distances: Vec<[f32; 256]>, // L2: query to centroids per subspace + dot_products: Vec<[f32; 256]>, // Cosine: for dot product computation + query_norms_sq: Vec, // Cosine: query norm per subspace + centroid_norms_sq: Vec<[f32; 256]>, // Cosine: centroid norms + metric: StdMetric, +} +``` + +### Scalar Quantization (compressor/scalar.rs) +Per-dimension quantization to int8: +``` +q = round((x - min) / (max - min) * 255) +``` + +Training: Compute per-dimension min/max from sample vectors + +4x memory reduction. SIMD-friendly (AVX-512, NEON). + +SQDistanceTable: +```rust +struct SQDistanceTable { + scaled_query: Vec, // (query - min) * scale per dimension + metric: StdMetric, + query_norm_sq: f32, // For cosine +} +``` + +### Truncation (compressor/trunc.rs) +For Matryoshka embeddings. Simply truncates to first N dimensions. + +## Distance Computation (metric/) + +Supported metrics: L2, Cosine + +SIMD implementations: +- AVX-512 (x86): 16 f32 simultaneously, VDPBF16PS for bf16 +- AVX-512 FP16: 32 f16 native +- NEON (ARM): 4 f32 + +Optimizations applied: +- 4x loop unrolling for L2 +- 2x loop unrolling for Cosine +- Software prefetch hints + +## Performance Benchmarks + +### Distance Computation (per call) +| Dimension | L2 (f32) | Cosine (f32) | +|-----------|----------|--------------| +| 128 | 10.0 ns | 9.7 ns | +| 384 | 13.0 ns | 33.4 ns | +| 768 | 30.4 ns | 39.9 ns | +| 1536 | 66.5 ns | 64.6 ns | + +### PQ ADC (768d, 64 subspaces) +| Method | Time | Speedup | +|--------|------|---------| +| ADC | 24.5 ns | 22.6x | +| Symmetric | 553.5 ns | baseline | + +### SQ ADC (768d) +| Method | Time | Speedup | +|--------|------|---------| +| SQ ADC | 50.6 ns | 13.4x | +| Dequantize+Compute | 676.7 ns | baseline | + +### Query Throughput (in-memory, uncompressed) +| Dataset | k | QPS | +|---------|---|-----| +| 128d, 100 vecs | 10 | 31.5K | +| 128d, 1K vecs | 10 | 8.4K | +| 128d, 10K vecs | 10 | 650 | +| 768d, 5K vecs | 10 | 537 | + +## RocksDB Configuration (store/rocksdb.rs) +- Block cache: 512MB +- Bloom filters enabled +- Point lookup optimization hint +- Increased parallelism +- No compression (vectors don't compress well) + +## CI Benchmarking + +### Workflow (.github/workflows/benchmark.yml) +Matrix-based parallel jobs for each dataset. + +### Datasets (hosted at https://static.wilsonl.in/embedding-datasets/) +| Dataset | Vectors | Dims | Metric | Ground Truth | +|---------|---------|------|--------|--------------| +| siftsmall | small | 128 | L2 | yes | +| sift-250k | 250K | 128 | L2 | yes | +| sift | 1M | 128 | L2 | yes | +| gist-250k | 250K | 960 | L2 | yes | +| gist | 1M | 960 | L2 | yes | +| bbcnews-nomicembed15 | varies | 768 | Cosine | no | +| bbcnews-static256 | varies | 256 | Cosine | no | +| steam-games | varies | varies | varies | no | +| gdelt-us-news | varies | varies | varies | no | + +Dataset format: +- info.toml: `{dtype, metric, dim, n, q, k}` +- vectors.bin: packed little-endian matrix (n × dim × sizeof(dtype)) +- queries.bin: packed queries (q × dim × sizeof(dtype)) +- results.bin: ground truth u32 indices (q × k × 4) + +Datasets without q/k run insert-only benchmarks. + +### CI Binary (ci/) +``` +ci --output results.json +``` + +Runs: +1. Random vector benchmarks (128d-1536d, 1K-50K vectors) +2. Compression benchmarks (PQ, SQ on 768d/10K) +3. Dataset benchmarks (auto-discovered from datasets/ folder) + +Output: JSON with insert throughput, query QPS, latency percentiles, recall@k + +## Vamana vs HNSW + +| Aspect | Vamana/DiskANN | HNSW | +|--------|----------------|------| +| Structure | Single layer | Multi-layer skip-list | +| Entry point | Fixed node 0 | Top layer node | +| Pruning condition | α · d(p*, p') ≤ d(p, p') | d(p*, p') < d(q, p') | +| α parameter | Yes (controls diameter) | No | +| Theoretical guarantee | O(log n) with α > 1 | No formal bound | +| Insert speed | ~2K/sec | ~10K/sec | +| Memory | ~0.8KB/vector (128d, R=64) | ~1.2KB/vector (128d, M=16) | +| Best for | Disk-based, read-heavy | In-memory, write-heavy | + +CoreNN uses Vamana because: +1. α parameter guarantees bounded latency for disk systems +2. Lower memory footprint +3. Single-layer simplifies persistence + +## Tuning Guide + +### For higher recall +- Increase `query_search_list_cap` (200-400 for 95%+, 400-600 for 99%+) +- Increase `beam_width` (8-16) +- Increase `max_edges` (96-128) +- Increase `distance_threshold` (α = 1.3-1.5) + +### For higher speed +- Decrease `query_search_list_cap` (64-100) +- Decrease `beam_width` (2-4) +- Decrease `max_edges` (32-48) +- Decrease `distance_threshold` (α = 1.1) + +### For memory reduction +- Use SQ (4x reduction) or PQ (16-32x reduction) +- Lower `max_edges` +- Lower `compression_threshold` + +## API + +### Rust +```rust +let db = CoreNN::create("/path/to/db", Cfg { dim: 768, ..Default::default() }); +db.insert("key", &vec); +let results = db.query(&query, 100); // Vec<(String, f64)> + +// Open existing +let db = CoreNN::open("/path/to/db"); +``` + +### Python +```python +from corenn_py import CoreNN +db = CoreNN.create("/path/to/db", {"dim": 768}) +db.insert_f32(keys, vectors) # vectors: numpy array +results = db.query_f32(queries, 100) # list of list of (key, dist) +``` + +### Node.js +```typescript +import { CoreNN } from "@corenn/node"; +const db = CoreNN.create("/path/to/db", { dim: 768 }); +db.insert([{ key: "k1", vector: new Float32Array([...]) }]); +const results = db.query(queryVec, 100); // { key, distance }[] +``` + +## Key Implementation Details + +### Lazy Pruning +`add_edges` accumulates backedges. Pruning triggers when `add_edges.len >= max_add_edges` (default 128 = 2x max_edges). Reduces write amplification. + +### Entry Point +Always node 0 (clone of first inserted vector). Static, unlike HNSW's dynamic top-layer entry. + +### Cache +Lazy in-memory cache of DbNodeData (uncompressed mode) or compressed vectors (compressed mode). Avoids DB roundtrips, not computation. + +### Visited Tracking +DashSet per search. TODO: visited list pool with generation counter for high QPS. + +### Concurrency +- `DashMap` for add_edges (lock-free reads) +- `ArbitraryLock` for node writes (per-node mutex) +- `RwLock` for mode transitions +- Atomic counters for count/next_id + +## References + +1. DiskANN (NeurIPS 2019): "Fast Accurate Billion-point Nearest Neighbor Search on a Single Node" +2. FreshDiskANN (2021): "Fast and Accurate Graph-Based ANN Index for Streaming Similarity Search" +3. HNSW (2016): "Efficient and robust approximate nearest neighbor search using Hierarchical Navigable Small World graphs" +4. Product Quantization (2010): "Product Quantization for Nearest Neighbor Search" +5. OPQ (2013): "Optimized Product Quantization for Approximate Nearest Neighbor Search" +6. ScaNN (2020): "Accelerating Large-Scale Inference with Anisotropic Vector Quantization" +7. NSG (2019): "Fast Approximate Nearest Neighbor Search with Navigating Spreading-out Graph" +8. SSG (2019): "Satellite System Graph" +9. RaBitQ (2024): "Quantizing High-Dimensional Vectors with a Theoretical Error Bound" +10. SPANN (2021): "Highly-efficient Billion-scale Approximate Nearest Neighbor Search" + +## Remaining Optimizations (TODO) + +1. Visited list pool (avoid DashSet allocation per search) +2. Lazy backedge updates (only prune when neighbor is truly full) +3. Memory-mapped mode for read-only workloads +4. Custom serialization (zero-copy for vectors) +5. Graph layout optimization (BFS ordering for cache locality) +6. Parallel beam expansion +7. Optional HNSW-style multi-layer mode diff --git a/hnswlib-rs/src/lib.rs b/hnswlib-rs/src/lib.rs index fe49fd5..7d6a904 100644 --- a/hnswlib-rs/src/lib.rs +++ b/hnswlib-rs/src/lib.rs @@ -340,12 +340,7 @@ impl<'a> HnswIndex<'a> { top_candidates } - pub fn search_knn( - &self, - query: &[f32], - k: usize, - metric: Metric, - ) -> Vec<(LabelType, f64)> { + pub fn search_knn(&self, query: &[f32], k: usize, metric: Metric) -> Vec<(LabelType, f64)> { let mut curr_obj = self.enter_point_node; let mut cur_dist = metric(query, &self.get_data_by_internal_id(curr_obj)); diff --git a/libcorenn/src/cfg.rs b/libcorenn/src/cfg.rs index 151bce0..af36a00 100644 --- a/libcorenn/src/cfg.rs +++ b/libcorenn/src/cfg.rs @@ -7,8 +7,10 @@ pub enum CompressionMode { // TODO Other options: // - PCA // - UMAP - // - Scalar quantization (int8/int4/int2/int1) + // Product Quantization: high compression, slower training. PQ, + // Scalar Quantization (int8): 4x compression, fast, simple. + SQ, // For Matryoshka embeddings. Trunc, } @@ -22,6 +24,11 @@ pub struct Cfg { pub compression_mode: CompressionMode, pub compression_threshold: usize, pub dim: usize, + /// Alpha parameter for Vamana's RobustPrune (α in the DiskANN paper). + /// Controls the tradeoff between graph sparsity and search path length: + /// - α = 1.0: Standard RNG pruning, sparser graph, potentially longer paths + /// - α > 1.0 (e.g., 1.2): More edges kept, guarantees O(log n) diameter + /// The paper recommends α = 1.2 for disk-based systems. pub distance_threshold: f64, pub max_add_edges: usize, pub max_edges: usize, @@ -42,8 +49,11 @@ impl Default for Cfg { compaction_threshold_deletes: 1_000_000, compression_mode: CompressionMode::PQ, compression_threshold: 10_000_000, - distance_threshold: 1.1, - max_add_edges: max_edges, + // α = 1.2 as recommended in DiskANN paper for disk-based systems + distance_threshold: 1.2, + // Lazy pruning: allow 2x edges before triggering pruning. + // This amortizes the cost of expensive pruning operations. + max_add_edges: max_edges * 2, max_edges, metric: StdMetric::L2, // L2 is the safe bet. pq_sample_size: 10_000, // Default: plenty, while fast to train. diff --git a/libcorenn/src/compressor/mod.rs b/libcorenn/src/compressor/mod.rs index d464eb2..750628e 100644 --- a/libcorenn/src/compressor/mod.rs +++ b/libcorenn/src/compressor/mod.rs @@ -7,17 +7,53 @@ use std::fmt::Debug; use std::sync::Arc; pub mod pq; +pub mod scalar; pub mod trunc; // Compressed vector. pub type CV = Arc; +/// Precomputed distance table for asymmetric distance computation (ADC). +/// Created once per query, reused for all distance computations. +pub type DistanceTable = Arc; + pub trait Compressor: Debug + Send + Sync { fn into_compressed(&self, v: VecData) -> CV; fn compress(&self, v: &VecData) -> CV { self.into_compressed(v.clone()) } fn dist(&self, metric: StdMetric, a: &CV, b: &CV) -> f64; + + /// Create a precomputed distance table for ADC (Asymmetric Distance Computation). + /// This is called once per query and enables fast distance computation. + /// Default implementation returns None (no ADC support). + fn create_distance_table(&self, _query: &VecData, _metric: StdMetric) -> Option { + None + } + + /// Compute distance using a precomputed table (ADC). + /// Returns None if ADC is not supported, in which case the caller should fall back to `dist`. + fn dist_with_table(&self, _table: &DistanceTable, _cv: &CV) -> Option { + None + } + + /// Fast distance from a raw query to a compressed vector using ADC if available. + /// Falls back to compressing the query and using symmetric distance. + fn dist_query( + &self, + query: &VecData, + cv: &CV, + metric: StdMetric, + table: Option<&DistanceTable>, + ) -> f64 { + if let Some(table) = table { + if let Some(dist) = self.dist_with_table(table, cv) { + return dist; + } + } + // Fallback: compress query and compute symmetric distance + self.dist(metric, &self.compress(query), cv) + } } impl CacheableTransformer for Arc { diff --git a/libcorenn/src/compressor/pq.rs b/libcorenn/src/compressor/pq.rs index ad7f492..32b52bd 100644 --- a/libcorenn/src/compressor/pq.rs +++ b/libcorenn/src/compressor/pq.rs @@ -14,6 +14,7 @@ use linfa_clustering::KMeans; use linfa_clustering::KMeansInit; use linfa_nn::distance::L2Dist; use ndarray::s; +use ndarray::Array1; use ndarray::Array2; use ndarray::ArrayView1; use ndarray::ArrayView2; @@ -26,6 +27,122 @@ use serde::Serialize; use std::cmp::min; use std::sync::Arc; +/// Precomputed distance lookup table for ADC (Asymmetric Distance Computation). +/// This stores the distance from a query subvector to all centroids for each subspace. +/// Created once per query, then used for fast distance computation to all quantized vectors. +#[derive(Debug)] +pub struct PQDistanceTable { + /// For L2: squared distances from query subvector to each centroid. + /// Shape: [num_subspaces][256] - 256 centroids per subspace. + pub squared_distances: Vec<[f32; 256]>, + /// For Cosine: dot products and norms needed for cosine computation. + /// We store dot products and the query's norm (per subspace). + pub dot_products: Vec<[f32; 256]>, + pub query_norms_sq: Vec, + pub centroid_norms_sq: Vec<[f32; 256]>, + pub metric: StdMetric, +} + +impl PQDistanceTable { + /// Compute distance to a quantized vector using the precomputed table. + /// This is O(M) where M = number of subspaces, vs O(M*D/M) = O(D) for full computation. + pub fn distance(&self, codes: &[u8]) -> f64 { + match self.metric { + StdMetric::L2 => self.distance_l2(codes), + StdMetric::Cosine => self.distance_cosine(codes), + } + } + + /// L2 distance using table lookup. Uses loop unrolling for better performance. + fn distance_l2(&self, codes: &[u8]) -> f64 { + let n = codes.len(); + let mut total_sq: f32 = 0.0; + let mut i = 0; + + // Unroll by 4 for better ILP + let limit_unrolled = n - (n % 4); + while i < limit_unrolled { + let c0 = codes[i] as usize; + let c1 = codes[i + 1] as usize; + let c2 = codes[i + 2] as usize; + let c3 = codes[i + 3] as usize; + + total_sq += self.squared_distances[i][c0]; + total_sq += self.squared_distances[i + 1][c1]; + total_sq += self.squared_distances[i + 2][c2]; + total_sq += self.squared_distances[i + 3][c3]; + + i += 4; + } + + // Handle remainder + while i < n { + total_sq += self.squared_distances[i][codes[i] as usize]; + i += 1; + } + + (total_sq as f64).sqrt() + } + + /// Cosine distance using table lookup. + fn distance_cosine(&self, codes: &[u8]) -> f64 { + let n = codes.len(); + let mut total_dot: f32 = 0.0; + let mut total_query_norm_sq: f32 = 0.0; + let mut total_centroid_norm_sq: f32 = 0.0; + + let mut i = 0; + let limit_unrolled = n - (n % 4); + + // Unroll by 4 + while i < limit_unrolled { + let c0 = codes[i] as usize; + let c1 = codes[i + 1] as usize; + let c2 = codes[i + 2] as usize; + let c3 = codes[i + 3] as usize; + + total_dot += self.dot_products[i][c0]; + total_dot += self.dot_products[i + 1][c1]; + total_dot += self.dot_products[i + 2][c2]; + total_dot += self.dot_products[i + 3][c3]; + + total_query_norm_sq += self.query_norms_sq[i]; + total_query_norm_sq += self.query_norms_sq[i + 1]; + total_query_norm_sq += self.query_norms_sq[i + 2]; + total_query_norm_sq += self.query_norms_sq[i + 3]; + + total_centroid_norm_sq += self.centroid_norms_sq[i][c0]; + total_centroid_norm_sq += self.centroid_norms_sq[i + 1][c1]; + total_centroid_norm_sq += self.centroid_norms_sq[i + 2][c2]; + total_centroid_norm_sq += self.centroid_norms_sq[i + 3][c3]; + + i += 4; + } + + // Handle remainder + while i < n { + let code = codes[i] as usize; + total_dot += self.dot_products[i][code]; + total_query_norm_sq += self.query_norms_sq[i]; + total_centroid_norm_sq += self.centroid_norms_sq[i][code]; + i += 1; + } + + const EPSILON: f32 = 1e-12; + if total_query_norm_sq < EPSILON || total_centroid_norm_sq < EPSILON { + return if total_query_norm_sq < EPSILON && total_centroid_norm_sq < EPSILON { + 0.0 + } else { + 1.0 + }; + } + + let denom = (total_query_norm_sq * total_centroid_norm_sq).sqrt(); + let cosine_sim = (total_dot / denom) as f64; + 1.0 - cosine_sim.clamp(-1.0, 1.0) + } +} + #[derive(Debug, Deserialize, Serialize)] pub struct ProductQuantizer { dims: usize, @@ -123,6 +240,78 @@ impl ProductQuantizer { } } +impl ProductQuantizer { + /// Create a distance lookup table for ADC (Asymmetric Distance Computation). + /// This precomputes distances from the query subvectors to all centroids, + /// enabling O(M) distance computation per quantized vector instead of O(D). + pub fn create_distance_table(&self, query: &Array1, metric: StdMetric) -> PQDistanceTable { + let subspaces = self.subspace_codebooks.len(); + let subdims = self.dims / subspaces; + + let mut squared_distances = Vec::with_capacity(subspaces); + let mut dot_products = Vec::with_capacity(subspaces); + let mut query_norms_sq = Vec::with_capacity(subspaces); + let mut centroid_norms_sq = Vec::with_capacity(subspaces); + + for (i, codebook) in self.subspace_codebooks.iter().enumerate() { + let query_sub = query.slice(s![i * subdims..(i + 1) * subdims]); + let centroids = codebook.centroids(); // Array2, shape [256, subdims] + + let mut sq_dists = [0.0f32; 256]; + let mut dots = [0.0f32; 256]; + let mut c_norms_sq = [0.0f32; 256]; + + // Query subvector norm (for cosine) + let q_norm_sq: f32 = query_sub.iter().map(|x| x * x).sum(); + + for j in 0..256 { + let centroid = centroids.row(j); + + match metric { + StdMetric::L2 => { + // Squared L2 distance: ||q - c||^2 + let mut sq_dist = 0.0f32; + for k in 0..subdims { + let diff = query_sub[k] - centroid[k]; + sq_dist += diff * diff; + } + sq_dists[j] = sq_dist; + } + StdMetric::Cosine => { + // Dot product and centroid norm for cosine + let mut dot = 0.0f32; + let mut c_norm_sq = 0.0f32; + for k in 0..subdims { + dot += query_sub[k] * centroid[k]; + c_norm_sq += centroid[k] * centroid[k]; + } + dots[j] = dot; + c_norms_sq[j] = c_norm_sq; + } + } + } + + squared_distances.push(sq_dists); + dot_products.push(dots); + query_norms_sq.push(q_norm_sq); + centroid_norms_sq.push(c_norms_sq); + } + + PQDistanceTable { + squared_distances, + dot_products, + query_norms_sq, + centroid_norms_sq, + metric, + } + } + + /// Fast distance computation using a precomputed table (ADC). + pub fn distance_with_table(&self, table: &PQDistanceTable, codes: &[u8]) -> f64 { + table.distance(codes) + } +} + impl Compressor for ProductQuantizer { fn into_compressed(&self, v: VecData) -> CV { let v = v.into_f32(); @@ -130,6 +319,22 @@ impl Compressor for ProductQuantizer { Arc::new(self.encode(&view)) } + fn create_distance_table( + &self, + query: &VecData, + metric: StdMetric, + ) -> Option { + let query_f32 = query.to_f32(); + let table = self.create_distance_table(&query_f32, metric); + Some(Arc::new(table)) + } + + fn dist_with_table(&self, table: &super::DistanceTable, cv: &CV) -> Option { + let table = table.downcast_ref::()?; + let codes = cv.downcast_ref::>()?; + Some(table.distance(codes)) + } + fn dist(&self, metric: StdMetric, a: &CV, b: &CV) -> f64 { let a_codes = a.downcast_ref::>().unwrap(); let b_codes = b.downcast_ref::>().unwrap(); diff --git a/libcorenn/src/compressor/scalar.rs b/libcorenn/src/compressor/scalar.rs new file mode 100644 index 0000000..3de939c --- /dev/null +++ b/libcorenn/src/compressor/scalar.rs @@ -0,0 +1,381 @@ +//! Scalar Quantization (SQ) Compressor +//! +//! Scalar quantization maps each float dimension to an 8-bit integer. +//! This provides 4x memory reduction with fast SIMD-friendly distance computation. +//! +//! The quantization formula is: +//! q = round((x - min) / (max - min) * 255) +//! +//! For L2 distance, we can compute in quantized space directly. +//! For cosine, we dequantize and compute (or use lookup tables). + +use super::Compressor; +use super::DistanceTable; +use super::CV; +use crate::metric::StdMetric; +use crate::vec::VecData; +use serde::Deserialize; +use serde::Serialize; +use std::sync::Arc; + +/// Scalar quantization parameters learned from training data. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct ScalarQuantizer { + /// Number of dimensions + dims: usize, + /// Minimum value per dimension + mins: Vec, + /// Scale factor per dimension: 255 / (max - min) + scales: Vec, + /// Inverse scale for dequantization: (max - min) / 255 + inv_scales: Vec, +} + +/// Distance lookup table for asymmetric scalar quantization. +/// Precomputes (query[i] - min[i]) * scale[i] for fast distance computation. +#[derive(Debug)] +pub struct SQDistanceTable { + /// Query values scaled to quantized space: (query - min) * scale + /// These are f32 to allow fractional values for asymmetric distance. + scaled_query: Vec, + metric: StdMetric, + /// For cosine: precomputed query norm squared + query_norm_sq: f32, +} + +impl ScalarQuantizer { + /// Train scalar quantizer from sample vectors. + /// Computes per-dimension min/max from the training data. + pub fn train(samples: &[Vec]) -> Self { + assert!(!samples.is_empty(), "Need at least one sample"); + let dims = samples[0].len(); + + // Initialize with first sample + let mut mins: Vec = samples[0].clone(); + let mut maxs: Vec = samples[0].clone(); + + // Find min/max per dimension + for sample in samples.iter().skip(1) { + assert_eq!(sample.len(), dims); + for (i, &val) in sample.iter().enumerate() { + mins[i] = mins[i].min(val); + maxs[i] = maxs[i].max(val); + } + } + + // Compute scales with epsilon to avoid division by zero + let epsilon = 1e-10; + let mut scales = Vec::with_capacity(dims); + let mut inv_scales = Vec::with_capacity(dims); + + for i in 0..dims { + let range = (maxs[i] - mins[i]).max(epsilon); + scales.push(255.0 / range); + inv_scales.push(range / 255.0); + } + + ScalarQuantizer { + dims, + mins, + scales, + inv_scales, + } + } + + /// Train from CoreNN database by sampling vectors. + pub fn train_from_corenn(corenn: &crate::CoreNN) -> Self { + use crate::store::schema::NODE; + use rand::seq::IteratorRandom; + + let sample_size = corenn.cfg.pq_sample_size; + let mut rng = rand::thread_rng(); + + // Sample vectors from the database + let samples: Vec> = NODE + .iter(&corenn.db) + .choose_multiple(&mut rng, sample_size) + .into_iter() + .map(|(_, node)| { + let vec = node.vector; + match vec.as_ref() { + VecData::BF16(v) => v.iter().map(|x| x.to_f32()).collect(), + VecData::F16(v) => v.iter().map(|x| x.to_f32()).collect(), + VecData::F32(v) => v.clone(), + VecData::F64(v) => v.iter().map(|x| *x as f32).collect(), + } + }) + .collect(); + + if samples.is_empty() { + panic!("Cannot train SQ: no vectors in database"); + } + + Self::train(&samples) + } + + /// Quantize a vector to u8 values. + pub fn quantize(&self, vec: &[f32]) -> Vec { + assert_eq!(vec.len(), self.dims); + vec + .iter() + .zip(self.mins.iter()) + .zip(self.scales.iter()) + .map(|((&v, &min), &scale)| { + let q = ((v - min) * scale).round(); + q.clamp(0.0, 255.0) as u8 + }) + .collect() + } + + /// Dequantize u8 values back to f32 (lossy). + pub fn dequantize(&self, quantized: &[u8]) -> Vec { + quantized + .iter() + .zip(self.mins.iter()) + .zip(self.inv_scales.iter()) + .map(|((&q, &min), &inv_scale)| min + (q as f32) * inv_scale) + .collect() + } + + /// Create distance table for asymmetric distance computation. + pub fn create_distance_table(&self, query: &[f32], metric: StdMetric) -> SQDistanceTable { + assert_eq!(query.len(), self.dims); + + // Scale query to quantized space (but keep as f32 for precision) + let scaled_query: Vec = query + .iter() + .zip(self.mins.iter()) + .zip(self.scales.iter()) + .map(|((&v, &min), &scale)| (v - min) * scale) + .collect(); + + let query_norm_sq = if metric == StdMetric::Cosine { + query.iter().map(|x| x * x).sum() + } else { + 0.0 + }; + + SQDistanceTable { + scaled_query, + metric, + query_norm_sq, + } + } + + /// Compute L2 distance using the distance table. + /// This is asymmetric: query is not quantized, target is quantized. + pub fn distance_l2(&self, table: &SQDistanceTable, quantized: &[u8]) -> f64 { + #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] + { + if is_x86_feature_detected!("avx512f") { + return unsafe { self.distance_l2_avx512(table, quantized) }; + } + } + + #[cfg(target_arch = "aarch64")] + { + if std::arch::is_aarch64_feature_detected!("neon") { + return unsafe { self.distance_l2_neon(table, quantized) }; + } + } + + self.distance_l2_scalar(table, quantized) + } + + /// Scalar fallback for L2 distance. + fn distance_l2_scalar(&self, table: &SQDistanceTable, quantized: &[u8]) -> f64 { + let mut original_sum_sq: f32 = 0.0; + for (i, &q) in quantized.iter().enumerate() { + let scaled_diff = table.scaled_query[i] - (q as f32); + let original_diff = scaled_diff * self.inv_scales[i]; + original_sum_sq += original_diff * original_diff; + } + (original_sum_sq as f64).sqrt() + } + + /// AVX-512 optimized L2 distance. + #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] + #[target_feature(enable = "avx512f")] + unsafe fn distance_l2_avx512(&self, table: &SQDistanceTable, quantized: &[u8]) -> f64 { + use std::arch::x86_64::*; + + let n = quantized.len(); + let mut i = 0; + + // Process 16 elements at a time with AVX-512 + let mut acc = _mm512_setzero_ps(); + + while i + 16 <= n { + // Load 16 u8 values and convert to f32 + let q_bytes = _mm_loadu_si128(quantized.as_ptr().add(i) as *const _); + let q_i32 = _mm512_cvtepu8_epi32(q_bytes); + let q_f32 = _mm512_cvtepi32_ps(q_i32); + + // Load scaled query and inv_scales + let sq = _mm512_loadu_ps(table.scaled_query.as_ptr().add(i)); + let inv_s = _mm512_loadu_ps(self.inv_scales.as_ptr().add(i)); + + // Compute (scaled_query - quantized) * inv_scale + let diff = _mm512_sub_ps(sq, q_f32); + let orig_diff = _mm512_mul_ps(diff, inv_s); + + // Accumulate squared differences + acc = _mm512_fmadd_ps(orig_diff, orig_diff, acc); + + i += 16; + } + + // Horizontal sum + let mut sum_sq = _mm512_reduce_add_ps(acc); + + // Handle remaining elements + for j in i..n { + let scaled_diff = table.scaled_query[j] - (quantized[j] as f32); + let original_diff = scaled_diff * self.inv_scales[j]; + sum_sq += original_diff * original_diff; + } + + (sum_sq as f64).sqrt() + } + + /// NEON optimized L2 distance for ARM. + #[cfg(target_arch = "aarch64")] + #[target_feature(enable = "neon")] + unsafe fn distance_l2_neon(&self, table: &SQDistanceTable, quantized: &[u8]) -> f64 { + use std::arch::aarch64::*; + + let n = quantized.len(); + let mut i = 0; + + let mut acc = vdupq_n_f32(0.0); + + while i + 4 <= n { + // Load 4 u8 values + let q_u8 = vld1_lane_u8::<0>(quantized.as_ptr().add(i), vdup_n_u8(0)); + let q_u8 = vld1_lane_u8::<1>(quantized.as_ptr().add(i + 1), q_u8); + let q_u8 = vld1_lane_u8::<2>(quantized.as_ptr().add(i + 2), q_u8); + let q_u8 = vld1_lane_u8::<3>(quantized.as_ptr().add(i + 3), q_u8); + + // Convert to f32 + let q_u16 = vmovl_u8(q_u8); + let q_u32 = vmovl_u16(vget_low_u16(q_u16)); + let q_f32 = vcvtq_f32_u32(q_u32); + + // Load scaled query and inv_scales + let sq = vld1q_f32(table.scaled_query.as_ptr().add(i)); + let inv_s = vld1q_f32(self.inv_scales.as_ptr().add(i)); + + // Compute (scaled_query - quantized) * inv_scale + let diff = vsubq_f32(sq, q_f32); + let orig_diff = vmulq_f32(diff, inv_s); + + // Accumulate squared differences + acc = vfmaq_f32(acc, orig_diff, orig_diff); + + i += 4; + } + + // Horizontal sum + let mut sum_sq = vaddvq_f32(acc); + + // Handle remaining elements + for j in i..n { + let scaled_diff = table.scaled_query[j] - (quantized[j] as f32); + let original_diff = scaled_diff * self.inv_scales[j]; + sum_sq += original_diff * original_diff; + } + + (sum_sq as f64).sqrt() + } + + /// Compute cosine distance using dequantization. + pub fn distance_cosine(&self, table: &SQDistanceTable, quantized: &[u8]) -> f64 { + // Dequantize and compute cosine + let dequantized = self.dequantize(quantized); + + let mut dot_product: f32 = 0.0; + let mut target_norm_sq: f32 = 0.0; + + // Compute original query values from scaled + for (i, &q) in dequantized.iter().enumerate() { + let query_val = table.scaled_query[i] * self.inv_scales[i] + self.mins[i]; + dot_product += query_val * q; + target_norm_sq += q * q; + } + + let denom = (table.query_norm_sq * target_norm_sq).sqrt(); + if denom < 1e-10 { + return if table.query_norm_sq < 1e-10 && target_norm_sq < 1e-10 { + 0.0 + } else { + 1.0 + }; + } + + let cosine_sim = (dot_product / denom) as f64; + 1.0 - cosine_sim.clamp(-1.0, 1.0) + } +} + +impl Compressor for ScalarQuantizer { + fn into_compressed(&self, v: VecData) -> CV { + let v_f32: Vec = match v { + VecData::BF16(v) => v.into_iter().map(|x| x.to_f32()).collect(), + VecData::F16(v) => v.into_iter().map(|x| x.to_f32()).collect(), + VecData::F32(v) => v, + VecData::F64(v) => v.into_iter().map(|x| x as f32).collect(), + }; + Arc::new(self.quantize(&v_f32)) + } + + fn create_distance_table(&self, query: &VecData, metric: StdMetric) -> Option { + let query_f32: Vec = match query { + VecData::BF16(v) => v.iter().map(|x| x.to_f32()).collect(), + VecData::F16(v) => v.iter().map(|x| x.to_f32()).collect(), + VecData::F32(v) => v.clone(), + VecData::F64(v) => v.iter().map(|x| *x as f32).collect(), + }; + Some(Arc::new(self.create_distance_table(&query_f32, metric))) + } + + fn dist_with_table(&self, table: &DistanceTable, cv: &CV) -> Option { + let table = table.downcast_ref::()?; + let quantized = cv.downcast_ref::>()?; + + Some(match table.metric { + StdMetric::L2 => self.distance_l2(table, quantized), + StdMetric::Cosine => self.distance_cosine(table, quantized), + }) + } + + fn dist(&self, metric: StdMetric, a: &CV, b: &CV) -> f64 { + let a_q = a.downcast_ref::>().unwrap(); + let b_q = b.downcast_ref::>().unwrap(); + + // Dequantize and compute distance + let a_f = self.dequantize(a_q); + let b_f = self.dequantize(b_q); + + match metric { + StdMetric::L2 => { + let sum_sq: f32 = a_f + .iter() + .zip(b_f.iter()) + .map(|(a, b)| (a - b) * (a - b)) + .sum(); + (sum_sq as f64).sqrt() + } + StdMetric::Cosine => { + let dot: f32 = a_f.iter().zip(b_f.iter()).map(|(a, b)| a * b).sum(); + let norm_a: f32 = a_f.iter().map(|x| x * x).sum(); + let norm_b: f32 = b_f.iter().map(|x| x * x).sum(); + let denom = (norm_a * norm_b).sqrt(); + if denom < 1e-10 { + 1.0 + } else { + 1.0 - ((dot / denom) as f64).clamp(-1.0, 1.0) + } + } + } + } +} diff --git a/libcorenn/src/lib.rs b/libcorenn/src/lib.rs index a933e39..575b2b9 100644 --- a/libcorenn/src/lib.rs +++ b/libcorenn/src/lib.rs @@ -1,16 +1,11 @@ -#![feature(avx512_target_feature)] +// Note: avx512_target_feature, path_add_extension, stdarch_x86_avx512 are now stable #![feature(duration_millis_float)] #![feature(f16)] -#![feature(path_add_extension)] #![cfg_attr(target_arch = "aarch64", feature(stdarch_neon_f16))] #![cfg_attr( any(target_arch = "x86", target_arch = "x86_64"), feature(stdarch_x86_avx512_f16) )] -#![cfg_attr( - any(target_arch = "x86", target_arch = "x86_64"), - feature(stdarch_x86_avx512) -)] use ahash::HashSet; use ahash::HashSetExt; @@ -25,8 +20,10 @@ use common::nan_to_num; use common::Id; use compaction::compact; use compressor::pq::ProductQuantizer; +use compressor::scalar::ScalarQuantizer; use compressor::trunc::TruncCompressor; use compressor::Compressor; +use compressor::DistanceTable; use compressor::CV; use dashmap::DashMap; use dashmap::DashSet; @@ -37,7 +34,6 @@ use ordered_float::OrderedFloat; use parking_lot::Mutex; use parking_lot::RwLock; use std::cmp::max; -use std::collections::VecDeque; use std::convert::identity; use std::iter::zip; use std::ops::Deref; @@ -55,6 +51,7 @@ use store::schema::ID_TO_KEY; use store::schema::KEY_TO_ID; use store::schema::NODE; use store::schema::PQ_MODEL; +use store::schema::SQ_MODEL; use store::Store; use tracing::debug; use util::AtomUsz; @@ -112,10 +109,16 @@ impl Point { } } + #[allow(dead_code)] pub fn dist_query(&self, query: &VecData) -> f64 { + self.dist_query_with_table(query, None) + } + + /// Compute distance to query, using ADC table if available for faster computation. + pub fn dist_query_with_table(&self, query: &VecData, table: Option<&DistanceTable>) -> f64 { match &self.vec { PointVec::Uncompressed(v) => (self.metric)(v, query), - PointVec::Compressed(c, cv) => c.dist(self.metric_type, cv, &c.compress(query)), + PointVec::Compressed(c, cv) => c.dist_query(query, cv, self.metric_type, table), } } } @@ -180,6 +183,16 @@ impl CoreNN { &'a self, ids: &'a [Id], query: Option<&'a VecData>, + ) -> impl Iterator> + 'a { + self.get_points_with_table(ids, query, None) + } + + /// Get points with optional ADC distance table for faster compressed distance computation. + fn get_points_with_table<'a>( + &'a self, + ids: &'a [Id], + query: Option<&'a VecData>, + dist_table: Option<&'a DistanceTable>, ) -> impl Iterator> + 'a { // Hold lock across all reads. Getting some compressed nodes and others uncompressed breaks all code that uses this data. let vecs = match &*self.mode.read() { @@ -207,101 +220,168 @@ impl CoreNN { dist: OrderedFloat(f64::INFINITY), }; if let Some(q) = query { - node.dist.0 = node.dist_query(q); + node.dist.0 = node.dist_query_with_table(q, dist_table); } Some(node) }) } + #[allow(dead_code)] fn get_point(&self, id: Id, query: Option<&VecData>) -> Option { self.get_points(&[id], query).exactly_one().ok().unwrap() } + /// Select diverse neighbors using Vamana's RobustPrune algorithm. + /// + /// This is Algorithm 2 from the DiskANN paper (Subramanya et al., NeurIPS 2019): + /// + /// ```text + /// RobustPrune(p, V, α, R): + /// V ← (V ∪ Nout(p)) \ {p} + /// Nout(p) ← ∅ + /// while V ≠ ∅ do + /// p* ← argmin_{p' ∈ V} d(p, p') // Pick closest to node + /// Nout(p) ← Nout(p) ∪ {p*} // Add to neighbors + /// if |Nout(p)| = R then break // Stop at max degree + /// for p' ∈ V do + /// if α · d(p*, p') ≤ d(p, p') then // α-RNG condition + /// remove p' from V // Prune covered points + /// ``` + /// + /// The α parameter (distance_threshold) is CRUCIAL: + /// - α = 1: Standard RNG, may have large diameter + /// - α > 1 (e.g., 1.2): Guarantees O(log n) diameter for disk-based search + /// because each step makes multiplicative progress toward query + /// + /// Complexity: O(R × |V|) where R = max_edges, |V| = candidates fn prune_candidates(&self, node: &VecData, candidate_ids: &[Id]) -> Vec { let max_edges = self.cfg.max_edges; - let dist_thresh = self.cfg.distance_threshold; + let alpha = self.cfg.distance_threshold; - let mut candidates = self + // Get all candidates sorted by distance to node (closest first) + let candidates: Vec = self .get_points(candidate_ids, Some(node)) - .filter_map(|n| n) + .flatten() .sorted_unstable_by_key(|s| s.dist) - .collect::>(); + .collect(); - let mut new_neighbors = Vec::new(); - // Even though the algorithm in the paper doesn't actually pop, the later pruning of the candidates at the end of the loop guarantees it will always be removed because d(p*, p') will always be zero for itself (p* == p'). - while let Some(p_star) = candidates.pop_front() { - new_neighbors.push(p_star.id); - if new_neighbors.len() == max_edges { + if candidates.is_empty() { + return Vec::new(); + } + + // If fewer candidates than max_edges, keep all + if candidates.len() <= max_edges { + return candidates.into_iter().map(|p| p.id).collect(); + } + + // Vamana RobustPrune: iteratively select closest and prune covered candidates + use std::collections::VecDeque; + let mut selected: Vec = Vec::with_capacity(max_edges); + let mut remaining: VecDeque = candidates.into(); + + while let Some(p_star) = remaining.pop_front() { + // p* is the closest remaining candidate to node + selected.push(p_star.id); + + if selected.len() >= max_edges { break; } - candidates.retain(|s| { - let cand_dist_to_node = s.dist.0; - let cand_dist_to_pick = p_star.dist(s); - cand_dist_to_node <= cand_dist_to_pick * dist_thresh + + // Remove candidates that are "covered" by p* using α-RNG condition: + // Remove p' if α · d(p*, p') ≤ d(node, p') + // Keep p' if α · d(p*, p') > d(node, p') + // Equivalently: keep if d(node, p') < α · d(p*, p') + remaining.retain(|p_prime| { + let dist_node_to_candidate = p_prime.dist.0; + let dist_selected_to_candidate = p_star.dist(p_prime); + // Keep if node is closer to candidate than α × selected-to-candidate + dist_node_to_candidate < alpha * dist_selected_to_candidate }); } - new_neighbors + + selected } fn search(&self, query: &VecData, k: usize, search_list_cap: usize) -> (Vec, DashSet) { - // NOTE: This is intentionally simple over optimized. - // Not the most optimal data structures or avoiding of malloc/memcpy. - // And that's OK — simple makes this easier to understand and maintain. - // The performance is still extremely fast — and probably fits in cache better and branches less. + // HNSW-style beam search with lowerBound early stopping. + // Uses a sorted list for simplicity (could use BinaryHeap for slight speedup). assert!( search_list_cap >= k, "search list capacity must be greater than or equal to k" ); - // Our list of candidate nodes, always sorted by distance. - // This is our result list, but also the candidate list for expansion. + + // Create ADC distance table for fast compressed distance computation. + let dist_table: Option = match &*self.mode.read() { + Mode::Compressed(compressor, _) => compressor.create_distance_table(query, self.cfg.metric), + Mode::Uncompressed(_) => None, + }; + let dist_table_ref = dist_table.as_ref(); + + // Results: best candidates found so far, sorted by distance let mut search_list = Vec::::new(); - // Seen != expansion. We just want to prevent duplicate nodes from being added to the search list. - // Use DashSet as we'll insert from for_each_concurrent. + // Visited set to prevent duplicates let seen = DashSet::new(); - // There's no need to expand the same node more than once. + // Expanded set - no need to expand twice let mut expanded = HashSet::new(); // Start with the entry node. - let Some(entry) = self.get_point(0, Some(query)) else { - // No entry node, empty DB. + let Some(entry) = self + .get_points_with_table(&[0], Some(query), dist_table_ref) + .next() + .flatten() + else { return Default::default(); }; + // lowerBound: distance to worst result in search_list + // HNSW stops when best unexpanded candidate > lowerBound + let mut lower_bound = entry.dist.0; search_list.push(entry); seen.insert(0); loop { - // Pop and mark beam_width nodes for expansion. - // We pop as we'll later re-rank then re-insert with updated dists. - let to_expand = search_list + // Find best unexpanded candidate + let to_expand: Vec = search_list .extract_if(.., |p| expanded.insert(p.id)) .take(self.cfg.beam_width) .collect_vec(); + if to_expand.is_empty() { break; - }; + } + + // HNSW-style early stopping: + // If best unexpanded candidate is worse than our worst result, stop + let best_unexpanded_dist = to_expand.first().map(|p| p.dist.0).unwrap_or(f64::INFINITY); + if best_unexpanded_dist > lower_bound && search_list.len() >= search_list_cap { + // Re-insert the candidates we extracted (they weren't expanded) + for p in to_expand { + expanded.remove(&p.id); + let pos = search_list + .binary_search_by_key(&p.dist, |s| s.dist) + .map_or_else(identity, identity); + search_list.insert(pos, p); + } + break; + } let fetched = self.get_nodes(&to_expand.iter().map(|p| p.id).collect_vec()); - // Add expanded neighbors to search list. let mut to_add = Vec::::new(); let mut neighbor_ids = Vec::::new(); + for (mut point, node) in zip(to_expand, fetched) { - // Node doesn't exist anymore. let Some(node) = node else { continue; }; - // Collect its neighbors to total set of neighbors. + // Collect neighbors for &neighbor in node.neighbors.iter() { - // We've seen this node in a previous search iteration, - // or in this iteration — but from another node's expansion. if !seen.insert(neighbor) { continue; } neighbor_ids.push(neighbor); } - // There may be additional neighbors. if let Some(add) = self.add_edges.get(&point.id) { for &neighbor in add.iter() { if !seen.insert(neighbor) { @@ -311,22 +391,25 @@ impl CoreNN { } }; - // Re-rank using full vector. + // Re-rank using full vector point.dist.0 = (self.metric)(&node.vector, query); to_add.push(point); } - // Get all neighbors at once. - for p in self.get_points(&neighbor_ids, Some(query)) { - if let Some(p) = p { + + // Get neighbors with distance computation + for p in self + .get_points_with_table(&neighbor_ids, Some(query), dist_table_ref) + .flatten() + { + // HNSW optimization: only add if could improve results + if search_list.len() < search_list_cap || p.dist.0 < lower_bound { to_add.push(p); } } - // WARNING: If you want to optimize by batching inserts, be careful: - // Two source values to add could be inserted at the same position but between themselves are not sorted. - // Remember to handle this scenario. + // Insert new candidates in sorted order for point in to_add { - // Remove soft-deleted if already expanded. We still need to expand soft-deleted to traverse the graph accurately. + // Skip soft-deleted if already expanded if self.deleted.contains(&point.id) && expanded.contains(&point.id) { continue; } @@ -336,8 +419,14 @@ impl CoreNN { search_list.insert(pos, point); } - // Without truncation each iteration, we'll search the entire graph. + // Truncate to search_list_cap search_list.truncate(search_list_cap); + + // Update lowerBound (distance to worst result) + // This is used for HNSW early stopping + if !search_list.is_empty() { + lower_bound = search_list.last().unwrap().dist.0; + } } // We use `seen` as candidates for new neighbors, so we should remove soft-deleted here too to avoid new edges to them. @@ -407,6 +496,10 @@ impl CoreNN { let compressor: Arc = Arc::new(pq); compressor }), + CompressionMode::SQ => SQ_MODEL.read(&db, ()).map(|sq| { + let compressor: Arc = Arc::new(sq); + compressor + }), CompressionMode::Trunc => Some(Arc::new(TruncCompressor::new(cfg.trunc_dims))), }; match compressor { @@ -514,6 +607,11 @@ impl CoreNN { PQ_MODEL.put(&corenn.db, (), &pq); Arc::new(pq) } + CompressionMode::SQ => { + let sq = ScalarQuantizer::train_from_corenn(&corenn); + SQ_MODEL.put(&corenn.db, (), &sq); + Arc::new(sq) + } CompressionMode::Trunc => Arc::new(TruncCompressor::new(corenn.cfg.trunc_dims)), }; *corenn.mode.write() = Mode::Compressed( diff --git a/libcorenn/src/metric/cosine.rs b/libcorenn/src/metric/cosine.rs index dd8a139..cbb826a 100644 --- a/libcorenn/src/metric/cosine.rs +++ b/libcorenn/src/metric/cosine.rs @@ -148,25 +148,48 @@ unsafe fn dist_cosine_f16_avx512(a: &[half::f16], b: &[half::f16]) -> f64 { #[target_feature(enable = "avx512f")] unsafe fn dist_cosine_f32_avx512(a: &[f32], b: &[f32]) -> f64 { let len = a.len(); + let ptr_a = a.as_ptr(); + let ptr_b = b.as_ptr(); - let mut dot_product_sum_vec = _mm512_setzero_ps(); - let mut a_norm_sq_sum_vec = _mm512_setzero_ps(); - let mut b_norm_sq_sum_vec = _mm512_setzero_ps(); + // Use 4 accumulators for better ILP + let mut dot0 = _mm512_setzero_ps(); + let mut dot1 = _mm512_setzero_ps(); + let mut a_norm0 = _mm512_setzero_ps(); + let mut a_norm1 = _mm512_setzero_ps(); + let mut b_norm0 = _mm512_setzero_ps(); + let mut b_norm1 = _mm512_setzero_ps(); let mut i = 0; - let vec_width = 16; // 16 f32 elements - while i + vec_width <= len { - let a_vec = _mm512_loadu_ps(a.as_ptr().add(i) as *const f32); - let b_vec = _mm512_loadu_ps(b.as_ptr().add(i) as *const f32); + // 2x unrolled loop (32 elements per iteration) + let limit_unrolled = len - (len % 32); + while i < limit_unrolled { + // Prefetch next cache lines (stay within allocation) + if i + 64 <= len { + _mm_prefetch(ptr_a.add(i + 64) as *const i8, _MM_HINT_T0); + _mm_prefetch(ptr_b.add(i + 64) as *const i8, _MM_HINT_T0); + } - dot_product_sum_vec = _mm512_fmadd_ps(a_vec, b_vec, dot_product_sum_vec); - a_norm_sq_sum_vec = _mm512_fmadd_ps(a_vec, a_vec, a_norm_sq_sum_vec); - b_norm_sq_sum_vec = _mm512_fmadd_ps(b_vec, b_vec, b_norm_sq_sum_vec); + let a0 = _mm512_loadu_ps(ptr_a.add(i)); + let b0 = _mm512_loadu_ps(ptr_b.add(i)); + let a1 = _mm512_loadu_ps(ptr_a.add(i + 16)); + let b1 = _mm512_loadu_ps(ptr_b.add(i + 16)); - i += vec_width; + dot0 = _mm512_fmadd_ps(a0, b0, dot0); + dot1 = _mm512_fmadd_ps(a1, b1, dot1); + a_norm0 = _mm512_fmadd_ps(a0, a0, a_norm0); + a_norm1 = _mm512_fmadd_ps(a1, a1, a_norm1); + b_norm0 = _mm512_fmadd_ps(b0, b0, b_norm0); + b_norm1 = _mm512_fmadd_ps(b1, b1, b_norm1); + + i += 32; } + // Combine accumulators + let dot_product_sum_vec = _mm512_add_ps(dot0, dot1); + let a_norm_sq_sum_vec = _mm512_add_ps(a_norm0, a_norm1); + let b_norm_sq_sum_vec = _mm512_add_ps(b_norm0, b_norm1); + let mut dot_product_sum = _mm512_reduce_add_ps(dot_product_sum_vec) as f64; let mut a_norm_sq_sum = _mm512_reduce_add_ps(a_norm_sq_sum_vec) as f64; let mut b_norm_sq_sum = _mm512_reduce_add_ps(b_norm_sq_sum_vec) as f64; diff --git a/libcorenn/src/metric/l2.rs b/libcorenn/src/metric/l2.rs index e01b18d..d3b5763 100644 --- a/libcorenn/src/metric/l2.rs +++ b/libcorenn/src/metric/l2.rs @@ -139,27 +139,67 @@ unsafe fn dist_l2_f16_avx512(a_slice: &[f16], b_slice: &[f16]) -> f64 { #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] unsafe fn dist_l2_f32_avx512(a_slice: &[f32], b_slice: &[f32]) -> f64 { let len = a_slice.len(); - let mut acc_sum_ps = _mm512_setzero_ps(); // Accumulator for sum of squares (16 f32s) + // Use 4 accumulators for better instruction-level parallelism + let mut acc0 = _mm512_setzero_ps(); + let mut acc1 = _mm512_setzero_ps(); + let mut acc2 = _mm512_setzero_ps(); + let mut acc3 = _mm512_setzero_ps(); let ptr_a = a_slice.as_ptr(); let ptr_b = b_slice.as_ptr(); let mut i = 0; - // Process chunks of 16 f32 elements - let limit_avx512 = len - (len % 16); + // Process chunks of 64 f32 elements (4x unrolled) + let limit_unrolled = len - (len % 64); + + while i < limit_unrolled { + // Prefetch next cache lines (within allocation bounds) + if i + 64 <= len { + _mm_prefetch(ptr_a.add(i + 64) as *const i8, _MM_HINT_T0); + _mm_prefetch(ptr_b.add(i + 64) as *const i8, _MM_HINT_T0); + } + if i + 80 <= len { + _mm_prefetch(ptr_a.add(i + 80) as *const i8, _MM_HINT_T0); + _mm_prefetch(ptr_b.add(i + 80) as *const i8, _MM_HINT_T0); + } + + // Load 16 f32s at a time, 4x unrolled + let v_a0 = _mm512_loadu_ps(ptr_a.add(i)); + let v_b0 = _mm512_loadu_ps(ptr_b.add(i)); + let v_a1 = _mm512_loadu_ps(ptr_a.add(i + 16)); + let v_b1 = _mm512_loadu_ps(ptr_b.add(i + 16)); + let v_a2 = _mm512_loadu_ps(ptr_a.add(i + 32)); + let v_b2 = _mm512_loadu_ps(ptr_b.add(i + 32)); + let v_a3 = _mm512_loadu_ps(ptr_a.add(i + 48)); + let v_b3 = _mm512_loadu_ps(ptr_b.add(i + 48)); + + // Compute differences + let diff0 = _mm512_sub_ps(v_a0, v_b0); + let diff1 = _mm512_sub_ps(v_a1, v_b1); + let diff2 = _mm512_sub_ps(v_a2, v_b2); + let diff3 = _mm512_sub_ps(v_a3, v_b3); + + // Square and accumulate using FMA + acc0 = _mm512_fmadd_ps(diff0, diff0, acc0); + acc1 = _mm512_fmadd_ps(diff1, diff1, acc1); + acc2 = _mm512_fmadd_ps(diff2, diff2, acc2); + acc3 = _mm512_fmadd_ps(diff3, diff3, acc3); + + i += 64; + } + // Combine accumulators + let acc01 = _mm512_add_ps(acc0, acc1); + let acc23 = _mm512_add_ps(acc2, acc3); + let mut acc_sum_ps = _mm512_add_ps(acc01, acc23); + + // Process remaining 16-element chunks + let limit_avx512 = len - (len % 16); while i < limit_avx512 { - // Load 16 f32s from a and b let v_a_ps = _mm512_loadu_ps(ptr_a.add(i)); let v_b_ps = _mm512_loadu_ps(ptr_b.add(i)); - - // Subtract let v_diff_ps = _mm512_sub_ps(v_a_ps, v_b_ps); - - // Square and accumulate: acc_sum_ps = acc_sum_ps + (v_diff_ps * v_diff_ps) - // Using FMA (fused multiply-add) acc_sum_ps = _mm512_fmadd_ps(v_diff_ps, v_diff_ps, acc_sum_ps); - i += 16; } diff --git a/libcorenn/src/store/rocksdb.rs b/libcorenn/src/store/rocksdb.rs index 31fea15..0f6fa14 100644 --- a/libcorenn/src/store/rocksdb.rs +++ b/libcorenn/src/store/rocksdb.rs @@ -15,12 +15,25 @@ pub fn rocksdb_options(create_if_missing: bool, error_if_exists: bool) -> Option let mut opt = Options::default(); opt.create_if_missing(create_if_missing); opt.set_error_if_exists(error_if_exists); - opt.set_max_background_jobs(num_cpus::get() as i32 * 2); + + // Parallelism settings + let num_cpus = num_cpus::get() as i32; + opt.set_max_background_jobs(num_cpus * 2); + opt.increase_parallelism(num_cpus); + + // Write settings opt.set_bytes_per_sync(1024 * 1024 * 4); opt.set_write_buffer_size(1024 * 1024 * 128); + + // No compression for vectors - they don't compress well and it adds CPU overhead opt.set_compression_type(rocksdb::DBCompressionType::None); - let cache = Cache::new_lru_cache(1024 * 1024 * 128); + // Optimize for point lookups (most common operation during search) + opt.optimize_for_point_lookup(256); // 256MB block cache + + // Use larger block cache - this is critical for vector workloads + // Vectors are frequently accessed and caching helps significantly + let cache = Cache::new_lru_cache(1024 * 1024 * 512); // 512MB cache // https://github.com/facebook/rocksdb/wiki/Block-Cache. let mut bbt_opt = BlockBasedOptions::default(); @@ -30,6 +43,10 @@ pub fn rocksdb_options(create_if_missing: bool, error_if_exists: bool) -> Option bbt_opt.set_cache_index_and_filter_blocks(true); bbt_opt.set_pin_l0_filter_and_index_blocks_in_cache(true); bbt_opt.set_format_version(6); + + // Add bloom filter for faster point lookups + bbt_opt.set_bloom_filter(10.0, false); + opt.set_block_based_table_factory(&bbt_opt); opt } diff --git a/libcorenn/src/store/schema.rs b/libcorenn/src/store/schema.rs index 2df8576..cf10f8c 100644 --- a/libcorenn/src/store/schema.rs +++ b/libcorenn/src/store/schema.rs @@ -3,6 +3,7 @@ use super::WriteOp; use crate::cfg::Cfg; use crate::common::Id; use crate::compressor::pq::ProductQuantizer; +use crate::compressor::scalar::ScalarQuantizer; use crate::vec::VecData; use rmp_serde::to_vec_named; use serde::de::DeserializeOwned; @@ -140,6 +141,7 @@ db_ent!(KEY_TO_ID, 4, String, Id); db_ent!(ID_TO_KEY, 5, Id, String); db_ent!(NODE, 6, Id, DbNodeData); db_ent!(PQ_MODEL, 7, (), ProductQuantizer); +db_ent!(SQ_MODEL, 8, (), ScalarQuantizer); // We store both in one DB entry to leverage one disk page read to get both, as specified in the DiskANN paper. (If we store them as separate DB entries, they are unlikely to be stored in the same disk page.) #[derive(Clone, Debug, Deserialize, Serialize)]