From d84807a20513c21b9295f3bdc7f6af00d1f4c362 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 6 Apr 2026 14:53:30 +0800 Subject: [PATCH 1/2] fix(query): fix hang when union and limit --- src/common/base/src/base/barrier.rs | 8 +++++++- .../transforms/new_hash_join/runtime_filter.rs | 12 ++++++++++++ .../transforms/new_hash_join/transform_hash_join.rs | 4 +++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/common/base/src/base/barrier.rs b/src/common/base/src/base/barrier.rs index a5d3b13d327eb..189037dc7bf85 100644 --- a/src/common/base/src/base/barrier.rs +++ b/src/common/base/src/base/barrier.rs @@ -85,7 +85,10 @@ impl Barrier { BarrierWaitResult(is_leader) } - pub fn reduce_quorum(&self, n: usize) { + /// Reduces the quorum by `n`. Returns `true` if the quorum reached zero + /// purely through reductions (no thread called `wait()`), meaning all + /// participants were removed without ever synchronizing. + pub fn reduce_quorum(&self, n: usize) -> bool { let locked = self.state.lock(); let mut state = locked.unwrap_or_else(PoisonError::into_inner); state.n -= n; @@ -95,9 +98,12 @@ impl Barrier { .waker .send(state.generation) .expect("there is at least one receiver"); + let all_reduced = state.arrived == 0; state.arrived = 0; state.generation += 1; + return all_reduced; } + false } } diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs index 670ec0a71b07d..7a7ded89d63d1 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/runtime_filter.rs @@ -81,6 +81,18 @@ impl RuntimeFiltersDesc { })) } + /// Close the broadcast source channel and notify runtime filter watchers. + /// Called when all threads of a hash join are short-circuited (e.g., downstream + /// LIMIT satisfied via sequential UNION ALL) and no thread will call `globalization`. + pub fn close_broadcast(&self) { + if let Some(broadcast_id) = self.broadcast_id { + self.ctx.broadcast_source_sender(broadcast_id).close(); + } + for ready in &self.runtime_filters_ready { + let _ = ready.runtime_filter_watcher.send(None); + } + } + pub async fn globalization(&self, mut packet: JoinRuntimeFilterPacket) -> Result<()> { if let Some(broadcast_id) = self.broadcast_id { packet = get_global_runtime_filter_packet(broadcast_id, packet, &self.ctx).await?; diff --git a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs index 0bc94bf6e99eb..d8051592155c9 100644 --- a/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs @@ -113,7 +113,9 @@ impl Processor for TransformHashJoin { std::mem::swap(&mut finished, &mut self.join); drop(finished); - self.stage_sync_barrier.reduce_quorum(1); + if self.stage_sync_barrier.reduce_quorum(1) { + self.rf_desc.close_broadcast(); + } } return Ok(Event::Finished); From 7e742b2a7b0264b1257713c8f230c0e367c9d758 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 6 Apr 2026 22:28:12 +0800 Subject: [PATCH 2/2] fix(query): fix hang when union and limit --- .../cluster/broadcast_join_short_circuit.test | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 tests/sqllogictests/suites/mode/cluster/broadcast_join_short_circuit.test diff --git a/tests/sqllogictests/suites/mode/cluster/broadcast_join_short_circuit.test b/tests/sqllogictests/suites/mode/cluster/broadcast_join_short_circuit.test new file mode 100644 index 0000000000000..7a4e4180f1a77 --- /dev/null +++ b/tests/sqllogictests/suites/mode/cluster/broadcast_join_short_circuit.test @@ -0,0 +1,51 @@ +statement ok +drop table if exists t_build + +statement ok +drop table if exists t_probe + +statement ok +create table t_build(a int not null, b string not null) + +statement ok +create table t_probe(a int not null, c string not null) + +statement ok +insert into t_build select number, concat('build_', to_string(number)) from numbers(1000) + +statement ok +insert into t_probe select number, concat('probe_', to_string(number)) from numbers(1000) + +statement ok +set enforce_shuffle_join = 1 + +statement ok +set enable_parallel_union_all = 0 + +query I +select count(*) from ( + select a, b from ( + select t_probe.a, t_build.b from t_probe inner join t_build on t_probe.a = t_build.a + union all + select t_probe.a, t_build.b from t_probe inner join t_build on t_probe.a = t_build.a + union all + select t_probe.a, t_build.b from t_probe inner join t_build on t_probe.a = t_build.a + union all + select t_probe.a, t_build.b from t_probe inner join t_build on t_probe.a = t_build.a + ) t + limit 5 +) +---- +5 + +statement ok +unset enforce_shuffle_join + +statement ok +unset enable_parallel_union_all + +statement ok +drop table if exists t_build + +statement ok +drop table if exists t_probe