Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d0bf5dc
refactor(query): add per-thread compact hash join for hash shuffle
zhang2014 Mar 16, 2026
1f9f2f3
refactor(query): streaming CompactProbeStream + per-join-type JoinStr…
zhang2014 Mar 16, 2026
3551243
z
zhang2014 Mar 16, 2026
f4b332b
z
zhang2014 Mar 22, 2026
7d9a2e3
Merge remote-tracking branch 'upstream/main' into refactor/partitione…
zhang2014 Mar 22, 2026
5e06fae
z
zhang2014 Mar 22, 2026
1036f52
z
zhang2014 Mar 22, 2026
8d29c42
z
zhang2014 Mar 22, 2026
bc7aae7
z
zhang2014 Mar 22, 2026
221d591
z
zhang2014 Mar 23, 2026
e2b965f
z
zhang2014 Mar 24, 2026
36d7c78
z
zhang2014 Mar 24, 2026
381b2e8
z
zhang2014 Mar 25, 2026
f801b4b
z
zhang2014 Mar 25, 2026
53a8957
z
zhang2014 Mar 25, 2026
cf692d8
z
zhang2014 Mar 26, 2026
4c985d7
z
zhang2014 Mar 26, 2026
2109310
z
zhang2014 Mar 27, 2026
f900c1d
z
zhang2014 Mar 27, 2026
0f3361e
z
zhang2014 Mar 27, 2026
abd629f
z
zhang2014 Mar 27, 2026
4ad0955
z
zhang2014 Mar 27, 2026
e076cc8
z
zhang2014 Mar 27, 2026
4872d06
z
zhang2014 Mar 31, 2026
2b9fbcb
Merge branch 'main' into refactor/partitioned-hash-join
zhang2014 Mar 31, 2026
a364fcf
z
zhang2014 Mar 31, 2026
b2b13b4
Merge branch 'refactor/partitioned-hash-join' of github.com:zhang2014…
zhang2014 Mar 31, 2026
7504545
z
zhang2014 Apr 1, 2026
5f1c088
z
zhang2014 Apr 1, 2026
b17090c
z
zhang2014 Apr 1, 2026
de4bd6e
z
zhang2014 Apr 1, 2026
471a2c9
z
zhang2014 Apr 1, 2026
363ba44
z
zhang2014 Apr 1, 2026
fe0d920
z
zhang2014 Apr 1, 2026
510dc4f
z
zhang2014 Apr 2, 2026
e12d53c
z
zhang2014 Apr 2, 2026
394bfe8
z
zhang2014 Apr 2, 2026
4be08eb
z
zhang2014 Apr 2, 2026
f6ce846
z
zhang2014 Apr 2, 2026
9941d22
z
zhang2014 Apr 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 141 additions & 7 deletions src/query/catalog/src/sbbf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,27 @@
//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf

use core::simd::Simd;
use core::simd::cmp::SimdPartialEq;
// Use NEON intrinsics on aarch64 for better performance
#[cfg(target_arch = "aarch64")]
use std::arch::aarch64::*;
use std::mem::size_of;
// Use portable SIMD on other platforms
#[cfg(not(target_arch = "aarch64"))]
use std::simd::Simd;
#[cfg(not(target_arch = "aarch64"))]
use std::simd::cmp::SimdPartialEq;
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;

use databend_common_base::runtime::Runtime;

/// Salt values as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
/// 32-byte aligned for optimal SIMD load performance.
#[repr(C, align(32))]
struct AlignedSalt([u32; 8]);

static SALT: AlignedSalt = AlignedSalt([
0x47b6137b_u32,
0x44974d91_u32,
0x8824ad5b_u32,
Expand All @@ -92,21 +102,52 @@ const SALT: [u32; 8] = [
0x2df1424b_u32,
0x9efc4947_u32,
0x5c6bfb31_u32,
];
]);

/// Shift amount for extracting bit index: (hash * salt) >> 27 gives 5 bits (0-31)
const SHIFT_NUM: i32 = 27;

/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
#[derive(Debug, Copy, Clone)]
#[repr(transparent)]
struct Block([u32; 8]);

#[cfg(not(target_arch = "aarch64"))]
type U32x8 = Simd<u32, 8>;

impl Block {
const ZERO: Block = Block([0; 8]);

/// takes as its argument a single unsigned 32-bit integer and returns a block in which each
/// word has exactly one bit set.
#[cfg(target_arch = "aarch64")]
#[inline]
fn mask(x: u32) -> Self {
unsafe {
let (mask_lo, mask_hi) = Self::mask_neon(x);
let mut result = [0u32; 8];
vst1q_u32_x2(result.as_mut_ptr(), uint32x4x2_t(mask_lo, mask_hi));
Self(result)
}
}

#[cfg(target_arch = "aarch64")]
#[inline(always)]
unsafe fn mask_neon(x: u32) -> (uint32x4_t, uint32x4_t) {
unsafe {
let ones = vdupq_n_u32(1);
let hash_data = vdupq_n_u32(x);
let salt = vld1q_u32_x2(SALT.0.as_ptr());
let bit_index_lo =
vreinterpretq_s32_u32(vshrq_n_u32::<SHIFT_NUM>(vmulq_u32(salt.0, hash_data)));
let bit_index_hi =
vreinterpretq_s32_u32(vshrq_n_u32::<SHIFT_NUM>(vmulq_u32(salt.1, hash_data)));
(vshlq_u32(ones, bit_index_lo), vshlq_u32(ones, bit_index_hi))
}
}

#[cfg(not(target_arch = "aarch64"))]
fn mask(x: u32) -> Self {
Self(Self::mask_simd(x).to_array())
}
Expand All @@ -132,6 +173,18 @@ impl Block {
}

/// Setting every bit in the block that was also set in the result from mask
#[cfg(target_arch = "aarch64")]
#[inline]
fn insert(&mut self, hash: u32) {
unsafe {
let (mask_lo, mask_hi) = Self::mask_neon(hash);
let data = vld1q_u32_x2(self.0.as_ptr());
let result = uint32x4x2_t(vorrq_u32(data.0, mask_lo), vorrq_u32(data.1, mask_hi));
vst1q_u32_x2(self.0.as_mut_ptr(), result);
}
}

#[cfg(not(target_arch = "aarch64"))]
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
Expand All @@ -140,16 +193,30 @@ impl Block {
}

/// Returns true when every bit that is set in the result of mask is also set in the block.
#[cfg(target_arch = "aarch64")]
#[inline]
fn check(&self, hash: u32) -> bool {
unsafe {
let (mask_lo, mask_hi) = Self::mask_neon(hash);
let data = vld1q_u32_x2(self.0.as_ptr());
// vbicq_u32(a, b) = a & !b: bits set in mask but not in data
let miss = vorrq_u32(vbicq_u32(mask_lo, data.0), vbicq_u32(mask_hi, data.1));
vmaxvq_u32(miss) == 0
}
}

#[cfg(not(target_arch = "aarch64"))]
fn check(&self, hash: u32) -> bool {
let mask = Self::mask_simd(hash);
let block_vec = U32x8::from_array(self.0);
(block_vec & mask).simd_ne(U32x8::splat(0)).all()
}

#[cfg(not(target_arch = "aarch64"))]
#[inline(always)]
fn mask_simd(x: u32) -> U32x8 {
let hash_vec = U32x8::splat(x);
let salt_vec = U32x8::from_array(SALT);
let salt_vec = U32x8::from_array(SALT.0);
let bit_index = (hash_vec * salt_vec) >> U32x8::splat(27);
U32x8::splat(1) << bit_index
}
Expand Down Expand Up @@ -199,7 +266,7 @@ pub struct Sbbf(Vec<Block>);
pub struct SbbfAtomic(Vec<BlockAtomic>);

pub(crate) const BITSET_MIN_LENGTH: usize = 32;
pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;
pub(crate) const BITSET_MAX_LENGTH: usize = 64 * 1024 * 1024;

#[inline]
fn hash_to_block_index_for_blocks(hash: u64, num_blocks: usize) -> usize {
Expand Down Expand Up @@ -306,6 +373,28 @@ impl Sbbf {
pub fn estimated_memory_size(&self) -> usize {
self.0.capacity() * std::mem::size_of::<Block>()
}

/// Zero-copy serialize to Vec<u32>, consuming self.
pub fn into_u32s(self) -> Vec<u32> {
let mut blocks = std::mem::ManuallyDrop::new(self.0);
let ptr = blocks.as_mut_ptr() as *mut u32;
let len = blocks.len() * 8;
let cap = blocks.capacity() * 8;
unsafe { Vec::from_raw_parts(ptr, len, cap) }
}

/// Zero-copy deserialize from Vec<u32>.
/// Returns None if length is not a multiple of 8 (one Block = 8 x u32).
pub fn from_u32s(words: Vec<u32>) -> Option<Self> {
if words.is_empty() || !words.len().is_multiple_of(8) {
return None;
}
let mut words = std::mem::ManuallyDrop::new(words);
let len = words.len() / 8;
let cap = words.capacity() / 8;
let ptr = words.as_mut_ptr() as *mut Block;
Some(Self(unsafe { Vec::from_raw_parts(ptr, len, cap) }))
}
}

impl SbbfAtomic {
Expand Down Expand Up @@ -497,7 +586,7 @@ mod tests {
(33, 64),
(99, 128),
(1024, 1024),
(999_000_000, 128 * 1024 * 1024),
(999_000_000, 64 * 1024 * 1024),
] {
assert_eq!(*expected, optimal_num_of_bytes(*input));
}
Expand Down Expand Up @@ -529,4 +618,49 @@ mod tests {
assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
}
}

#[test]
fn test_sbbf_to_bytes_from_bytes_roundtrip() {
let mut filter = Sbbf::new_with_ndv_fpp(1000, 0.01).unwrap();
let hashes: Vec<u64> = (0..500).collect();
filter.insert_hash_batch(&hashes);

let words = filter.into_u32s();
let restored = Sbbf::from_u32s(words).unwrap();

for hash in &hashes {
assert!(restored.check_hash(*hash));
}
}

#[test]
fn test_sbbf_from_u32s_invalid() {
assert!(Sbbf::from_u32s(vec![]).is_none());
assert!(Sbbf::from_u32s(vec![0; 7]).is_none());
assert!(Sbbf::from_u32s(vec![0; 9]).is_none());
assert!(Sbbf::from_u32s(vec![0; 8]).is_some());
assert!(Sbbf::from_u32s(vec![0; 16]).is_some());
}

#[test]
fn test_sbbf_union_after_serialization() {
let mut f1 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
for i in 0..50 {
f1.insert_hash(i);
}
let mut f2 = Sbbf::new_with_ndv_fpp(100, 0.01).unwrap();
for i in 50..100 {
f2.insert_hash(i);
}

let words1 = f1.into_u32s();
let words2 = f2.into_u32s();
let mut restored1 = Sbbf::from_u32s(words1).unwrap();
let restored2 = Sbbf::from_u32s(words2).unwrap();
restored1.union(&restored2);

for i in 0..100 {
assert!(restored1.check_hash(i));
}
}
}
2 changes: 1 addition & 1 deletion src/query/expression/src/aggregate/group_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub fn group_hash_entries(entries: ProjectedBlock, values: &mut [u64]) {
}
}

fn combine_group_hash_column<const IS_FIRST: bool>(c: &Column, values: &mut [u64]) {
pub fn combine_group_hash_column<const IS_FIRST: bool>(c: &Column, values: &mut [u64]) {
HashVisitor::<IS_FIRST> { values }
.visit_column(c.clone())
.unwrap()
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ serde_urlencoded = { workspace = true }
sha2 = { workspace = true }
socket2 = { workspace = true }
sqlx = { workspace = true }
strength_reduce = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use databend_common_sql::StreamContext;
use databend_common_sql::Symbol;
use databend_common_sql::Visibility;
use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::executor::physical_plans::DataDistribution;
use databend_common_sql::plans::BoundColumnRef;
use databend_common_sql::plans::ConstantExpr;
use databend_common_sql::plans::FunctionCall;
Expand Down Expand Up @@ -69,6 +70,10 @@ impl IPhysicalPlan for AddStreamColumn {
&mut self.meta
}

fn output_data_distribution(&self) -> DataDistribution {
self.input.output_data_distribution()
}

fn children<'a>(&'a self) -> Box<dyn Iterator<Item = &'a PhysicalPlan> + 'a> {
Box::new(std::iter::once(&self.input))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_pipeline_transforms::TransformPipelineHelper;
use databend_common_sql::Symbol;
use databend_common_sql::executor::physical_plans::DataDistribution;
use databend_common_sql::plans::GroupingSets;

use crate::physical_plans::explain::PlanStatsInfo;
Expand Down Expand Up @@ -58,6 +59,10 @@ impl IPhysicalPlan for AggregateExpand {
&mut self.meta
}

fn output_data_distribution(&self) -> DataDistribution {
self.input.output_data_distribution()
}

#[recursive::recursive]
fn output_schema(&self) -> Result<DataSchemaRef> {
let input_schema = self.input.output_schema()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_sql::ScalarExpr;
use databend_common_sql::Symbol;
use databend_common_sql::executor::physical_plans::AggregateFunctionDesc;
use databend_common_sql::executor::physical_plans::AggregateFunctionSignature;
use databend_common_sql::executor::physical_plans::DataDistribution;
use databend_common_sql::executor::physical_plans::SortDesc;
use databend_common_sql::optimizer::ir::SExpr;
use databend_common_sql::plans::Aggregate;
Expand Down Expand Up @@ -111,6 +112,13 @@ impl IPhysicalPlan for AggregateFinal {
Ok(AggregateFinalFormatter::create(self))
}

fn output_data_distribution(&self) -> DataDistribution {
match self.group_by.is_empty() {
true => DataDistribution::Serial,
false => DataDistribution::Random,
}
}

fn get_desc(&self) -> Result<String> {
Ok(self.agg_funcs.iter().map(|x| x.display.clone()).join(", "))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use databend_common_pipeline_transforms::TransformPipelineHelper;
use databend_common_pipeline_transforms::sorts::TransformRankLimitSort;
use databend_common_sql::Symbol;
use databend_common_sql::executor::physical_plans::AggregateFunctionDesc;
use databend_common_sql::executor::physical_plans::DataDistribution;
use databend_common_sql::executor::physical_plans::SortDesc;
use databend_common_storage::DataOperator;
use itertools::Itertools;
Expand Down Expand Up @@ -81,6 +82,10 @@ impl IPhysicalPlan for AggregatePartial {
&mut self.meta
}

fn output_data_distribution(&self) -> DataDistribution {
self.input.output_data_distribution()
}

#[recursive::recursive]
fn output_schema(&self) -> Result<DataSchemaRef> {
let input_schema = self.input.output_schema()?;
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/physical_plans/physical_async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use databend_common_pipeline_transforms::TransformPipelineHelper;
use databend_common_sql::ColumnSet;
use databend_common_sql::ScalarExpr;
use databend_common_sql::binder::AsyncFunctionDesc;
use databend_common_sql::executor::physical_plans::DataDistribution;
use databend_common_sql::optimizer::ir::SExpr;
use itertools::Itertools;

Expand Down Expand Up @@ -59,6 +60,10 @@ impl IPhysicalPlan for AsyncFunction {
&mut self.meta
}

fn output_data_distribution(&self) -> DataDistribution {
self.input.output_data_distribution()
}

#[recursive::recursive]
fn output_schema(&self) -> Result<DataSchemaRef> {
let input_schema = self.input.output_schema()?;
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/physical_plans/physical_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::any::Any;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataSchemaRef;
use databend_common_sql::executor::physical_plans::DataDistribution;
use databend_common_sql::executor::physical_plans::FragmentKind;

use super::Exchange;
Expand Down Expand Up @@ -48,6 +49,10 @@ impl IPhysicalPlan for BroadcastSource {
&mut self.meta
}

fn output_data_distribution(&self) -> DataDistribution {
DataDistribution::Random
}

fn derive(&self, children: Vec<PhysicalPlan>) -> PhysicalPlan {
assert!(children.is_empty());
PhysicalPlan::new(BroadcastSource {
Expand Down Expand Up @@ -88,6 +93,10 @@ impl IPhysicalPlan for BroadcastSink {
&mut self.meta
}

fn output_data_distribution(&self) -> DataDistribution {
DataDistribution::Serial
}

#[recursive::recursive]
fn output_schema(&self) -> Result<DataSchemaRef> {
Ok(DataSchemaRef::default())
Expand Down
Loading
Loading