Skip to content

Commit

Permalink
feat(optimizer): support intersect (#9573)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored May 6, 2023
1 parent 445ca8e commit ee7bf4a
Show file tree
Hide file tree
Showing 14 changed files with 679 additions and 13 deletions.
44 changes: 44 additions & 0 deletions e2e_test/batch/basic/intersect.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v3 int);

statement ok
insert into t1 values(1, 2),(1, 2);

statement ok
insert into t2 values(1, 2),(1, 2);

query II
select * from t1 intersect select * from t2
----
1 2

query I
select 1 intersect select 1
----
1

query I
select 1 intersect select 2
----

query I
select null intersect select null
----
NULL

query I
select 1 as a intersect select 1 intersect select 1
----
1

statement ok
drop table t1;

statement ok
drop table t2;
49 changes: 49 additions & 0 deletions e2e_test/streaming/intersect.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v3 int);

statement ok
create materialized view v as select * from t1 intersect select * from t2;

query II
select * from v;
----

statement ok
insert into t1 values(1, 2),(1, 2);

query II
select * from v;
----


statement ok
insert into t2 values(1, 2),(1, 2);


query II
select * from v;
----
1 2


statement ok
delete from t1 where v1 = 1;

query II
select * from v;
----

statement ok
drop materialized view v;

statement ok
drop table t1;

statement ok
drop table t2;
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10028,
"plan_node_id": 10029,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10037, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand All @@ -496,7 +496,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10037, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand Down
253 changes: 253 additions & 0 deletions src/frontend/planner_test/tests/testdata/intersect.yaml

Large diffs are not rendered by default.

39 changes: 32 additions & 7 deletions src/frontend/src/binder/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ pub enum BoundSetOperation {
Intersect,
}

impl From<SetOperator> for BoundSetOperation {
fn from(value: SetOperator) -> Self {
match value {
SetOperator::Union => BoundSetOperation::Union,
SetOperator::Intersect => BoundSetOperation::Intersect,
SetOperator::Except => BoundSetOperation::Except,
}
}
}

impl BoundSetExpr {
/// The schema returned by this [`BoundSetExpr`].
Expand Down Expand Up @@ -123,17 +133,18 @@ impl Binder {
right,
} => {
match op {
SetOperator::Union => {
SetOperator::Union | SetOperator::Intersect => {
let left = Box::new(self.bind_set_expr(*left)?);
// Reset context for right side, but keep `cte_to_relation`.
let new_context = std::mem::take(&mut self.context);
self.context.cte_to_relation = new_context.cte_to_relation.clone();
let right = Box::new(self.bind_set_expr(*right)?);

if left.schema().fields.len() != right.schema().fields.len() {
return Err(ErrorCode::InvalidInputSyntax(
"each UNION query must have the same number of columns".to_string(),
)
return Err(ErrorCode::InvalidInputSyntax(format!(
"each {} query must have the same number of columns",
op
))
.into());
}

Expand All @@ -145,7 +156,8 @@ impl Binder {
{
if a.data_type != b.data_type {
return Err(ErrorCode::InvalidInputSyntax(format!(
"UNION types {} of column {} is different from types {} of column {}",
"{} types {} of column {} is different from types {} of column {}",
op,
a.data_type.prost_type_name().as_str_name(),
a.name,
b.data_type.prost_type_name().as_str_name(),
Expand All @@ -155,20 +167,33 @@ impl Binder {
}
}

if all {
match op {
SetOperator::Union => {}
SetOperator::Intersect | SetOperator::Except => {
return Err(ErrorCode::NotImplemented(
format!("{} all", op),
None.into(),
)
.into())
}
}
}

// Reset context for the set operation.
// Consider this case:
// select a from t2 union all select b from t2 order by a+1; should throw an
// error.
self.context = BindContext::default();
self.context.cte_to_relation = new_context.cte_to_relation;
Ok(BoundSetExpr::SetOperation {
op: BoundSetOperation::Union,
op: op.into(),
all,
left,
right,
})
}
SetOperator::Intersect | SetOperator::Except => Err(ErrorCode::NotImplemented(
SetOperator::Except => Err(ErrorCode::NotImplemented(
format!("set expr: {:?}", op),
None.into(),
)
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,16 @@ lazy_static! {
);

static ref PULL_UP_HOP: OptimizationStage = OptimizationStage::new(
"Pull up hop",
"Pull Up Hop",
vec![PullUpHopRule::create()],
ApplyOrder::BottomUp,
);

static ref SET_OPERATION_TO_JOIN: OptimizationStage = OptimizationStage::new(
"Set Operation To Join",
vec![IntersectToSemiJoinRule::create()],
ApplyOrder::BottomUp,
);
}

impl LogicalOptimizer {
Expand Down Expand Up @@ -372,6 +378,8 @@ impl LogicalOptimizer {
}
}

plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN);

plan = plan.optimize_by_rules(&UNION_MERGE);

plan = Self::subquery_unnesting(plan, enable_share_plan, explain_trace, &ctx)?;
Expand Down Expand Up @@ -442,6 +450,7 @@ impl LogicalOptimizer {
plan = plan.optimize_by_rules(&DAG_TO_TREE);

plan = plan.optimize_by_rules(&REWRITE_LIKE_EXPR);
plan = plan.optimize_by_rules(&SET_OPERATION_TO_JOIN);
plan = plan.optimize_by_rules(&UNION_MERGE);
plan = plan.optimize_by_rules(&ALWAYS_FALSE_FILTER);

Expand Down
59 changes: 59 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/intersect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use risingwave_common::catalog::Schema;

use super::{GenericPlanNode, GenericPlanRef};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::FunctionalDependencySet;

/// `Intersect` returns the intersect of the rows of its inputs.
/// If `all` is false, it needs to eliminate duplicates.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Intersect<PlanRef> {
pub all: bool,
pub inputs: Vec<PlanRef>,
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Intersect<PlanRef> {
fn schema(&self) -> Schema {
self.inputs[0].schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].logical_pk().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
self.inputs[0].ctx()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
FunctionalDependencySet::new(self.inputs[0].schema().len())
}
}

impl<PlanRef: GenericPlanRef> Intersect<PlanRef> {
pub fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
let mut builder = f.debug_struct(name);
self.fmt_fields_with_builder(&mut builder);
builder.finish()
}

pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) {
builder.field("all", &self.all);
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ mod share;
pub use share::*;
mod dedup;
pub use dedup::*;
mod intersect;
mod over_window;
pub use intersect::*;
pub use over_window::*;

pub trait GenericPlanRef {
Expand Down
Loading

0 comments on commit ee7bf4a

Please sign in to comment.