Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fast path for left-related join if build side is empty #11865

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::DataSchemaRef;
use common_sql::plans::JoinType;

use super::ProbeState;
use crate::pipelines::processors::transforms::hash_join::desc::JoinState;
Expand Down Expand Up @@ -106,4 +107,7 @@ pub trait HashJoinState: Send + Sync {

/// Get `merged_schema` which is `probe_schema` + `build_schema`
fn merged_schema(&self) -> Result<DataSchemaRef>;

/// Get join type
fn join_type(&self) -> JoinType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,9 @@ impl HashJoinState for JoinHashTable {
}
// Fast path for hash join
if row_num == 0
&& matches!(
&& !matches!(
self.hash_join_desc.join_type,
JoinType::Inner
| JoinType::Right
| JoinType::Cross
| JoinType::RightAnti
| JoinType::RightSemi
JoinType::LeftMark | JoinType::RightMark
)
&& self.ctx.get_cluster().is_empty()
{
Expand Down Expand Up @@ -793,4 +789,8 @@ impl HashJoinState for JoinHashTable {
.collect::<Vec<_>>();
Ok(DataSchemaRefExt::create(merged_fields))
}

fn join_type(&self) -> JoinType {
self.hash_join_desc.join_type.clone()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::pipelines::processors::transforms::hash_join::desc::HashJoinDesc;
use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
use crate::pipelines::processors::transforms::hash_join::util::probe_schema_wrap_nullable;
use crate::pipelines::processors::HashJoinState;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -247,6 +248,15 @@ impl JoinHashTable {
probe_state.valids = valids;
}

if self.fast_return()?
&& matches!(
self.hash_join_desc.join_type,
JoinType::Left | JoinType::Single | JoinType::Full | JoinType::LeftAnti
)
{
return self.left_fast_return(&input);
}

let hash_table = unsafe { &*self.hash_table.get() };
with_join_hash_method!(|T| match hash_table {
HashJoinHashTable::T(table) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::iter::TrustedLen;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::BlockEntry;
use common_expression::DataBlock;
use common_expression::Scalar;
use common_expression::Value;
use common_hashtable::HashJoinHashtableLike;

use super::JoinHashTable;
Expand Down Expand Up @@ -117,4 +120,24 @@ impl JoinHashTable {
))),
}
}

pub(crate) fn left_fast_return(&self, input: &DataBlock) -> Result<Vec<DataBlock>> {
if self.hash_join_desc.join_type == JoinType::LeftAnti {
return Ok(vec![input.clone()]);
}
let null_build_block = DataBlock::new(
self.row_space
.data_schema
.fields()
.iter()
.map(|df| BlockEntry {
data_type: df.data_type().clone(),
value: Value::Scalar(Scalar::Null),
})
.collect(),
input.num_rows(),
);

Ok(vec![self.merge_eq_block(&null_build_block, input)?])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;
use std::collections::VecDeque;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_sql::plans::JoinType;
Expand Down Expand Up @@ -218,15 +219,37 @@ impl Processor for TransformHashJoinProbe {
HashJoinStep::Build => {
self.join_state.wait_finalize_finish().await?;
if self.join_state.fast_return()? {
self.step = HashJoinStep::Finished;
match self.join_state.join_type() {
JoinType::Inner
| JoinType::Right
| JoinType::Cross
| JoinType::RightAnti
| JoinType::RightSemi
| JoinType::LeftSemi => {
self.step = HashJoinStep::Finished;
}
JoinType::Left | JoinType::Full | JoinType::Single | JoinType::LeftAnti => {
self.step = HashJoinStep::Probe;
}
_ => {
return Err(ErrorCode::Internal(format!(
"Join type: {:?} is unexpected",
self.join_state.join_type()
)));
}
}
return Ok(());
}
self.step = HashJoinStep::Probe;
}
HashJoinStep::Finalize => unreachable!(),
HashJoinStep::Probe => {
self.join_state.wait_probe_finish().await?;
self.step = HashJoinStep::OuterScan;
if self.join_state.fast_return()? {
self.step = HashJoinStep::Finished;
} else {
self.step = HashJoinStep::OuterScan;
}
}
HashJoinStep::OuterScan | HashJoinStep::Finished => unreachable!(),
};
Expand Down
110 changes: 105 additions & 5 deletions tests/sqllogictests/suites/query/join.test
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ create table t2(c smallint unsigned null)
statement ok
insert into t2 values(1),(2),(null)

query IF
query I
select TRY_CAST((c + 1) AS Int64 NULL) from t2;
----
2
3
NULL


query IF
query IR
select * from t inner join t1 on t.a = t1.b order by a, b
----
1 1.0
Expand Down Expand Up @@ -275,7 +275,7 @@ select * from t inner join t2 on t.a = t2.c + 1 and t.a - 1 = t2.c order by a, c
3 2


query FI
query RI
select * from t1 inner join t on t.a = t1.b order by a, b
----
1.0 1
Expand Down Expand Up @@ -491,8 +491,24 @@ INSERT INTO t2(c1float) VALUES (0.9702655076980591);
statement ok
INSERT INTO t2(c1float, c2varchar) VALUES (0.5340723991394043, '02'), (0.4661566913127899, '1261837');

statement ok
SELECT t0.c0boolean, t1.c0boolean, t1.c1float FROM t0, t1 RIGHT JOIN t2 ON t1.c0boolean;
query IIR
SELECT t0.c0boolean, t1.c0boolean, t1.c1float FROM t0, t1 RIGHT JOIN t2 ON t1.c0boolean order by t0.c0boolean;
----
0 NULL NULL
0 NULL NULL
0 NULL NULL
0 NULL NULL
0 NULL NULL
0 NULL NULL
0 NULL NULL
0 NULL NULL
0 NULL NULL
1 NULL NULL
1 NULL NULL
1 NULL NULL
1 NULL NULL
1 NULL NULL
1 NULL NULL


statement ok
Expand Down Expand Up @@ -530,3 +546,87 @@ select * from t1 join t2 on t1.a < t2.b order by t1.a;
----
1 3
2 3

statement ok
drop table t1;

statement ok
drop table t2;

statement ok
create table t1 (a int);

# right join with empty build side
query II
select * from (select * from numbers(100)) n right join t1 on n.number = t1.a;
----

# inner join with empty build side
query II
select * from (select * from numbers(100)) n join t1 on n.number = t1.a;
----

# right semi with empty build side
query II
select * from (select * from numbers(100)) n right semi join t1 on n.number = t1.a;
----

# right anti with empty build side
query II
select * from (select * from numbers(100)) n right anti join t1 on n.number = t1.a;
----

# left semi with empty build side
query II
select * from (select * from numbers(100)) n left semi join t1 on n.number = t1.a;
----

# left anti join with empty build side
query I
select * from (select * from numbers(10)) n left anti join t1 on n.number = t1.a order by number;
----
0
1
2
3
4
5
6
7
8
9


# left join with empty build side
query II
select * from (select * from numbers(10)) n left join t1 on n.number = t1.a order by n.number;
----
0 NULL
1 NULL
2 NULL
3 NULL
4 NULL
5 NULL
6 NULL
7 NULL
8 NULL
9 NULL


# full join with empty build side
query II
select * from (select * from numbers(10)) n full join t1 on n.number = t1.a order by n.number;
----
0 NULL
1 NULL
2 NULL
3 NULL
4 NULL
5 NULL
6 NULL
7 NULL
8 NULL
9 NULL

statement ok
drop table t1;