Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/query/service/src/physical_plans/physical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ enum PhysicalJoinType {
},
}

fn asof_hash_join_type(join_type: JoinType) -> JoinType {
match join_type {
JoinType::Asof => JoinType::Inner,
// ASOF rewrite swaps children to:
// probe = window(original right)
// build = original left
//
// HashJoin preserves the probe side for LEFT joins and the build side
// for RIGHT joins, so outer ASOF joins need the opposite mapping here
// to preserve the original SQL null-preserving side.
JoinType::LeftAsof => JoinType::Right,
JoinType::RightAsof => JoinType::Left,
_ => join_type,
}
}

// Choose physical join type by join conditions
fn physical_join(join: &Join, s_expr: &SExpr) -> Result<PhysicalJoinType> {
if join.equi_conditions.is_empty() && join.join_type.is_any_join() {
Expand Down Expand Up @@ -131,6 +147,31 @@ impl PhysicalPlanBuilder {
// 2. Build physical plan.
// Choose physical join type by join conditions
if join.join_type.is_asof_join() {
if !join.equi_conditions.is_empty() {
// Binder rewrites ASOF into:
// 1. the original inequality; and
// 2. a window-derived boundary that guarantees at most one build row
// matches inside each equi-key partition.
//
// When equi conditions are present, we can therefore reuse the existing
// hash join path to first shrink candidates by the equi keys, then apply
// the ASOF residual predicates as post-join filters.
let mut hash_join = join.clone();
hash_join.join_type = asof_hash_join_type(hash_join.join_type);

return self
.build_hash_join(
&hash_join,
s_expr,
required,
others_required,
left_required,
right_required,
stat_info,
)
.await;
}

let left_prop = s_expr.left_child().derive_relational_prop()?;
let right_prop = s_expr.right_child().derive_relational_prop()?;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# name: test/sql/join/asof/test_asof_join_eq_hash_fast_path.test
# description: Validate ASOF joins with equality keys still return correct rows
# group: [asof]

statement ok
drop table if exists asof_eq_prices

statement ok
drop table if exists asof_eq_trades

statement ok
create table asof_eq_prices(
wh timestamp,
symbol int,
price int
)

statement ok
insert into asof_eq_prices values
('2020-01-01 00:00:00', 1, 10),
('2020-01-01 00:00:05', 1, 11),
('2020-01-01 00:00:02', 2, 20),
('2020-01-01 00:00:06', 2, 21)

statement ok
create table asof_eq_trades(
id int,
wh timestamp,
symbol int
)

statement ok
insert into asof_eq_trades values
(1, '2020-01-01 00:00:01', 1),
(2, '2020-01-01 00:00:03', 2),
(3, '2020-01-01 00:00:05', 2),
(4, '2020-01-01 00:00:06', 1),
(5, '2020-01-01 00:00:07', 3),
(6, '2019-12-31 23:59:59', 1)

# Inner ASOF should keep only matching rows, choose the nearest predecessor,
# and never cross the equality partition.
query II
select t.id, p.price
from asof_eq_trades t
asof join asof_eq_prices p
on t.symbol = p.symbol and t.wh >= p.wh
order by t.id
----
1 10
2 20
3 20
4 11

# Left ASOF should preserve unmatched rows as NULL while keeping the same
# nearest-predecessor semantics for matched rows.
query II
select t.id, p.price
from asof_eq_trades t
asof left join asof_eq_prices p
on t.symbol = p.symbol and t.wh >= p.wh
order by t.id
----
1 10
2 20
3 20
4 11
5 NULL
6 NULL

# Right ASOF should preserve unmatched build-side rows from the original
# `prices` table after the ASOF rewrite swaps the physical children.
query II
select p.price, t.id
from asof_eq_trades t
asof right join asof_eq_prices p
on t.symbol = p.symbol and t.wh >= p.wh
order by p.price, t.id
----
10 1
11 4
20 2
20 3
21 NULL

statement ok
drop table asof_eq_trades

statement ok
drop table asof_eq_prices
Loading