Skip to content

Commit

Permalink
feat(frontend): Add planning and option in frontend for Lookup Join (#…
Browse files Browse the repository at this point in the history
…3763)

* feat(frontend): Add plan node for batch lookup join

* feat(frontend): Add config for using lookup join

* feat(frontend): Change batch lookup join to unary node

* feat(frontend): Add sources for lookup join node

* feat(frontend): Fix bugs in Lookup Join

* feat(frontend): Add error handling when data types of two sides of equality predicate are different

* feat(frontend): Fix formatting and remove extraneous code

* feat(frontend): fix bug

* feat(frontend): Add e2e tests

* feat(frontend): Fix e2e tests

* feat(frontend): Better handling for when lookup join's conditions are not met

* feat(frontend): Fix bug with comparing order key and predicate indices

* feat(frontend): Refactored code and renamed config to rw_batch_enable_lookup_join

* feat(frontend): Fix bug with output indices

* feat(frontend): Fix test

* feat(frontend): Fix another test

* feat(frontend): Fix tests

* feat(frontend): Change name and add more e2e tests

* feat(frontend): Fix formatting

* feat(frontend): Fix test bug

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
Graphcalibur and mergify[bot] authored Jul 13, 2022
1 parent 7a1331b commit ff4c0d7
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 18 deletions.
95 changes: 95 additions & 0 deletions e2e_test/batch/basic/local/lookup_join.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
statement ok
set rw_batch_enable_lookup_join to true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v2 int);

statement ok
insert into t1 values (3, 3);

statement ok
insert into t2 values (1, 3), (1, 5), (4, 10);

statement ok
create materialized view t3 as select v1, count(*) as v2 from t2 group by v1;

query IIII
select * from t1 left join t3 on t1.v1 = t3.v1;
----
3 3 NULL NULL

statement ok
insert into t1 values (1, 2);

query IIII
select * from t1 join t3 on t1.v1 = t3.v1;
----
1 2 1 2

statement error
select * from t1 right join t3 on t1.v1 = t3.v1;

statement ok
drop materialized view t3;

statement ok
drop table t1;

statement ok
drop table t2;

statement ok
create table t1 (v1 varchar, v2 varchar);

statement ok
create table t2 (v1 varchar, v2 varchar);

statement ok
insert into t1 values ('norm', 'forl');

statement ok
insert into t2 values ('do', 'gww'), ('f', 'flow'), ('hi', 'hello');

statement ok
create materialized view t3 as select count(*) as v1, v2 from t2 group by v2;

query IIII
select * from t1 left join t3 on t1.v2 = t3.v2;
----
norm forl NULL NULL

statement ok
insert into t1 values ('fire', 'gww');

query IIII
select * from t1 join t3 on t1.v2 = t3.v2;
----
fire gww 1 gww

query IIII
select t1.v2, t3.v2 from t1 join t3 on t1.v2 = t3.v2;
----
gww gww

query IIII
select t3.v1 from t1 join t3 on t1.v2 = t3.v2;
----
1

statement error
select * from t1 right join t3 on t1.v1 = t3.v2;

statement ok
drop materialized view t3;

statement ok
drop table t1;

statement ok
drop table t2;

statement ok
set rw_batch_enable_lookup_join to false;
1 change: 1 addition & 0 deletions e2e_test/batch/local_mode.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ statement ok
SET QUERY_MODE TO local;

include ./basic/*.slt.part
include ./basic/local/*.slt.part
include ./aggregate/*.slt.part
include ./types/*.slt.part
include ./catalog/*.slt.part
Expand Down
5 changes: 3 additions & 2 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ message LookupJoinNode {
expr.ExprNode condition = 2;
repeated int32 build_side_key = 3;
plan_common.CellBasedTableDesc probe_side_table_desc = 4;
repeated uint32 output_indices = 5;
repeated ExchangeSource sources = 6;
repeated int32 probe_side_column_ids = 5;
repeated uint32 output_indices = 6;
repeated ExchangeSource sources = 7;
}

message PlanNode {
Expand Down
61 changes: 49 additions & 12 deletions src/batch/src/executor/join/lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::{DataType, ToOwnedDatum};
use risingwave_common::util::chunk_coalesce::{DataChunkBuilder, SlicedDataChunk};
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{
ExchangeNode, ExchangeSource as ProstExchangeSource, LocalExecutePlan, PlanFragment, PlanNode,
RowSeqScanNode, ScanRange,
ExchangeInfo, ExchangeNode, ExchangeSource as ProstExchangeSource, LocalExecutePlan,
PlanFragment, PlanNode, RowSeqScanNode, ScanRange,
};
use risingwave_pb::plan_common::CellBasedTableDesc;
use uuid::Uuid;
Expand All @@ -43,9 +44,11 @@ use crate::task::{BatchTaskContext, TaskId};

/// Probe side source for the `LookupJoinExecutor`
pub struct ProbeSideSource<C> {
build_side_eq_types: Vec<DataType>,
build_side_idxs: Vec<usize>,
table_desc: CellBasedTableDesc,
probe_side_schema: Schema,
build_side_idxs: Vec<usize>,
probe_side_column_ids: Vec<i32>,
source_templates: Vec<ProstExchangeSource>,
context: C,
task_id: TaskId,
Expand All @@ -62,6 +65,29 @@ impl<C: BatchTaskContext> ProbeSideSource<C> {
/// Creates the `RowSeqScanNode` that will be used for scanning the probe side table
/// based on the passed `RowRef`.
fn create_row_seq_scan_node(&self, cur_row: &RowRef) -> Result<NodeBody> {
// Check that the data types of both sides of the equality predicate are the same
// TODO: Handle the cases where the data types of both sides are different but castable
// (e.g. int32 and int64)
let probe_side_eq_types = self
.table_desc
.order_key
.iter()
.map(|order| {
self.probe_side_schema.fields[order.index as usize]
.data_type
.clone()
})
.collect_vec();

if !(0..self.build_side_eq_types.len()).all(|i| {
i < probe_side_eq_types.len() && self.build_side_eq_types[i] == probe_side_eq_types[i]
}) {
return Err(ErrorCode::NotImplemented(
"Lookup Joins where the two sides of an equality predicate have different data types".to_string(),
None.into()
).into());
}

let eq_conds = self
.build_side_idxs
.iter()
Expand All @@ -82,10 +108,7 @@ impl<C: BatchTaskContext> ProbeSideSource<C> {

Ok(NodeBody::RowSeqScan(RowSeqScanNode {
table_desc: Some(self.table_desc.clone()),
column_ids: (0..self.table_desc.columns.len())
.into_iter()
.map(|x| x as i32)
.collect(), // Scan all the columns
column_ids: self.probe_side_column_ids.clone(),
scan_range,
vnode_bitmap: None,
}))
Expand All @@ -103,7 +126,10 @@ impl<C: BatchTaskContext> ProbeSideSource<C> {
identity: Uuid::new_v4().to_string(),
node_body: Some(self.create_row_seq_scan_node(cur_row)?),
}),
exchange_info: None,
exchange_info: Some(ExchangeInfo {
mode: DistributionMode::Single as i32,
..Default::default()
}),
}),
epoch: inner_template_plan.epoch,
};
Expand Down Expand Up @@ -306,7 +332,7 @@ impl<P: 'static + ProbeSideSourceBuilder> LookupJoinExecutor<P> {
}

let one_row_chunk =
convert_datum_refs_to_chunk(&build_datum_refs, 1, &self.schema.data_types())?;
convert_datum_refs_to_chunk(&build_datum_refs, 1, &self.chunk_builder.data_types())?;

Ok(Some(one_row_chunk))
}
Expand Down Expand Up @@ -359,11 +385,15 @@ impl BoxedExecutorBuilder for LookupJoinExecutorBuilder {
.map(|column_desc| Field::from(&ColumnDesc::from(column_desc)))
.collect_vec(),
};
let probe_side_len = probe_side_schema.len();
let probe_side_column_ids = lookup_join_node.get_probe_side_column_ids().to_vec();
let probe_side_len = probe_side_column_ids.len();

let fields = [
build_child.schema().fields.clone(),
probe_side_schema.fields.clone(),
probe_side_column_ids
.iter()
.map(|&i| probe_side_schema.fields[i as usize].clone())
.collect(),
]
.concat();
let original_schema = Schema { fields };
Expand All @@ -378,11 +408,18 @@ impl BoxedExecutorBuilder for LookupJoinExecutorBuilder {
build_side_idxs.push(*build_side_key as usize)
}

let build_side_eq_types = build_side_idxs
.iter()
.map(|&idx| build_side_data_types[idx].clone())
.collect_vec();

ensure!(!lookup_join_node.get_sources().is_empty());
let probe_side_source = ProbeSideSource {
build_side_eq_types,
build_side_idxs,
table_desc: probe_side_table_desc.clone(),
probe_side_schema,
build_side_idxs,
probe_side_column_ids,
source_templates: lookup_join_node.get_sources().to_vec(),
context: source.context().clone(),
task_id: source.task_id.clone(),
Expand Down
21 changes: 20 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ use crate::error::{ErrorCode, RwError};

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 6] = [
const CONFIG_KEYS: [&str; 7] = [
"RW_IMPLICIT_FLUSH",
"QUERY_MODE",
"RW_FORCE_DELTA_JOIN",
"EXTRA_FLOAT_DIGITS",
"APPLICATION_NAME",
"DATE_STYLE",
"RW_BATCH_ENABLE_LOOKUP_JOIN",
];
const IMPLICIT_FLUSH: usize = 0;
const QUERY_MODE: usize = 1;
const DELTA_JOIN: usize = 2;
const EXTRA_FLOAT_DIGITS: usize = 3;
const APPLICATION_NAME: usize = 4;
const DATE_STYLE: usize = 5;
const BATCH_ENABLE_LOOKUP_JOIN: usize = 6;

trait ConfigEntry: Default + FromStr<Err = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -154,6 +156,7 @@ type ApplicationName = ConfigString<APPLICATION_NAME>;
type ExtraFloatDigit = ConfigI32<EXTRA_FLOAT_DIGITS, 1>;
// TODO: We should use more specified type here.
type DateStyle = ConfigString<DATE_STYLE>;
type BatchEnableLookupJoin = ConfigBool<BATCH_ENABLE_LOOKUP_JOIN, false>;

#[derive(Default)]
pub struct ConfigMap {
Expand All @@ -177,6 +180,9 @@ pub struct ConfigMap {

/// see https://www.postgresql.org/docs/current/runtime-config-client.html#GUC-DATESTYLE
date_style: DateStyle,

/// To force the usage of lookup join instead of hash join in batch execution
batch_enable_lookup_join: BatchEnableLookupJoin,
}

impl ConfigMap {
Expand All @@ -193,6 +199,8 @@ impl ConfigMap {
self.application_name = val.parse()?;
} else if key.eq_ignore_ascii_case(DateStyle::entry_name()) {
self.date_style = val.parse()?;
} else if key.eq_ignore_ascii_case(BatchEnableLookupJoin::entry_name()) {
self.batch_enable_lookup_join = val.parse()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand All @@ -213,6 +221,8 @@ impl ConfigMap {
Ok(self.application_name.to_string())
} else if key.eq_ignore_ascii_case(DateStyle::entry_name()) {
Ok(self.date_style.to_string())
} else if key.eq_ignore_ascii_case(BatchEnableLookupJoin::entry_name()) {
Ok(self.batch_enable_lookup_join.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -250,6 +260,11 @@ impl ConfigMap {
setting : self.date_style.to_string(),
description : String::from("It is typically set by an application upon connection to the server.")
},
VariableInfo{
name : BatchEnableLookupJoin::entry_name().to_lowercase(),
setting : self.batch_enable_lookup_join.to_string(),
description : String::from("To enable the usage of lookup join instead of hash join when possible for local batch execution")
},
]
}

Expand All @@ -276,4 +291,8 @@ impl ConfigMap {
pub fn get_date_style(&self) -> &str {
&self.date_style
}

pub fn get_batch_enable_lookup_join(&self) -> bool {
*self.batch_enable_lookup_join
}
}
4 changes: 4 additions & 0 deletions src/common/src/util/chunk_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ impl DataChunkBuilder {
self.buffered_count
}

pub fn data_types(&self) -> Vec<DataType> {
self.data_types.clone()
}

#[try_stream(boxed, ok = DataChunk, error = RwError)]
pub async fn trunc_data_chunk(&mut self, data_chunk: DataChunk) {
let mut sliced_data_chunk = SlicedDataChunk::new_checked(data_chunk)?;
Expand Down
Loading

0 comments on commit ff4c0d7

Please sign in to comment.