Skip to content

Commit

Permalink
feat(executor): Basic in-application-memory DynamicFilterExecutor, …
Browse files Browse the repository at this point in the history
…pass tpch q11, q22 e2e tests (#3681)

* initial

* stash

* do-appply-planner-test

* fmt

* clippy

* license

* minor

* stash

* fix required dist, output indices

* fmt

* stash

* revert

* do-apply-planner-test

* success!

* more e2e

* minor cleanup

* stash

* fix

* fix

* fix

* minor

* minor

* madsim

* clippy

* fmt

* minor

* fix

* fix, apply code review suggestions

* fix

* fix, improve

* simplify

* clippy

* refactor into sub functions

* clippy

* minor

* fix

* fix: rowsort

* fmt

* use literal in expression

* clippy

* fmt

* use madsim btreemap

* minor

* fmt

* respect previous behaviour - return empty message

* bump-ci

Co-authored-by: TennyZhuang <zty0826@gmail.com>
  • Loading branch information
jon-chuang and TennyZhuang authored Jul 7, 2022
1 parent 07f282b commit d767d99
Show file tree
Hide file tree
Showing 17 changed files with 658 additions and 9 deletions.
2 changes: 2 additions & 0 deletions e2e_test/streaming/tpch/create_views.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ include ./views/q7.slt.part
include ./views/q8.slt.part
include ./views/q9.slt.part
include ./views/q10.slt.part
include ./views/q11.slt.part
include ./views/q12.slt.part
include ./views/q13.slt.part
include ./views/q14.slt.part
Expand All @@ -16,3 +17,4 @@ include ./views/q17.slt.part
#include ./views/q18.slt.part
include ./views/q19.slt.part
#include ./views/q20.slt.part
#include ./views/q22.slt.part
9 changes: 9 additions & 0 deletions e2e_test/streaming/tpch/drop_views.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ drop materialized view tpch_q9;
statement ok
drop materialized view tpch_q10;

statement ok
drop materialized view tpch_q11;

statement ok
drop materialized view tpch_q12;

Expand All @@ -39,3 +42,9 @@ drop materialized view tpch_q17;

statement ok
drop materialized view tpch_q19;

statement ok
drop materialized view tpch_q20;

statement ok
drop materialized view tpch_q22;
72 changes: 72 additions & 0 deletions e2e_test/streaming/tpch/q11.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
query IR rowsort
select * from tpch_q11;
----
82 19919213.4
182 12562394.1
175 9253263.4
142 8273960.3
124 8203209.3
116 7520874.2
51 7264184.5
168 7244030.2
73 7115781.1
32 6852925.6
130 6530034.1
199 6339641.5
166 5445353.3
72 4900497.3
75 4808130.4
132 4781984.1
74 4702018.3
100 4642272.6
30 4638608.8
24 4180139.5
44 3997453.7
158 3737919.4
68 3690702.4
108 3469209.3
22 3467053.8
2 3366668.6
137 3269230.1
13 3185424.5
91 2951493.4
155 2924499.5
200 2895688.3
12 2381625.3
99 2353581.3
151 2298313.0
10 2245355.5
19 2217961.9
8 2172003.9
58 2118624.6
92 2091581.4
42 2073731.3
55 1998844.2
144 1963651.4
50 1854529.6
28 1825685.6
172 1812423.0
64 1811894.6
37 1781994.1
128 1706573.0
110 1635451.6
62 1232507.6
102 1213422.0
66 1148101.4
6 1130989.3
119 982887.8
112 948072.8
46 909722.4
191 786330.8
16 667751.1
113 663824.2
106 647708.1
173 583317.5
146 573455.5
122 459864.5
52 358792.4
162 290757.6
152 229781.6
164 179298.7
192 120406.1
150 106928.1
5 changes: 5 additions & 0 deletions e2e_test/streaming/tpch/q22.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
query TT rowsort
select * from tpch_q22;
----
30 1 7638.57
31 2 14318.40
30 changes: 30 additions & 0 deletions e2e_test/streaming/tpch/views/q11.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
statement ok
create materialized view tpch_q11 as
select
ps_partkey,
round(sum(ps_supplycost * ps_availqty), 1) as value
from
partsupp,
supplier,
nation
where
ps_suppkey = s_suppkey
and s_nationkey = n_nationkey
and n_name = 'ARGENTINA'
group by
ps_partkey
having
sum(ps_supplycost * ps_availqty) > (
select
sum(ps_supplycost * ps_availqty) * 0.0001000000
from
partsupp,
supplier,
nation
where
ps_suppkey = s_suppkey
and s_nationkey = n_nationkey
and n_name = 'ARGENTINA'
)
order by
value desc;
39 changes: 39 additions & 0 deletions e2e_test/streaming/tpch/views/q22.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
statement ok
create materialized view tpch_q22 as
select
cntrycode,
count(*) as numcust,
sum(c_acctbal) as totacctbal
from
(
select
substring(c_phone from 1 for 2) as cntrycode,
c_acctbal
from
customer
where
substring(c_phone from 1 for 2) in
('30', '24', '31', '38', '25', '34', '37')
and c_acctbal > (
select
avg(c_acctbal)
from
customer
where
c_acctbal > 0.00
and substring(c_phone from 1 for 2) in
('30', '24', '31', '38', '25', '34', '37')
)
and not exists (
select
*
from
orders
where
o_custkey = c_custkey
)
) as custsale
group by
cntrycode
order by
cntrycode;
12 changes: 10 additions & 2 deletions e2e_test/streaming/tpch_snapshot.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ include ./tpch/views/q7.slt.part
include ./tpch/views/q8.slt.part
include ./tpch/views/q9.slt.part
include ./tpch/views/q10.slt.part
# include ./tpch/views/q11.slt.part
include ./tpch/views/q11.slt.part
include ./tpch/views/q12.slt.part
include ./tpch/views/q13.slt.part
include ./tpch/views/q14.slt.part
include ./tpch/views/q17.slt.part
include ./tpch/views/q18.slt.part
include ./tpch/views/q19.slt.part
include ./tpch/views/q20.slt.part
include ./tpch/views/q22.slt.part

include ./tpch/q1.slt.part
include ./tpch/q2.slt.part
Expand All @@ -38,14 +39,15 @@ include ./tpch/q7.slt.part
include ./tpch/q8.slt.part
include ./tpch/q9.slt.part
include ./tpch/q10.slt.part
# include ./tpch/q11.slt.part
include ./tpch/q11.slt.part
include ./tpch/q12.slt.part
include ./tpch/q13.slt.part
include ./tpch/q14.slt.part
include ./tpch/q17.slt.part
include ./tpch/q18.slt.part
include ./tpch/q19.slt.part
include ./tpch/q20.slt.part
include ./tpch/q22.slt.part

statement ok
drop materialized view tpch_q1;
Expand Down Expand Up @@ -77,6 +79,9 @@ drop materialized view tpch_q9;
statement ok
drop materialized view tpch_q10;

statement ok
drop materialized view tpch_q11;

statement ok
drop materialized view tpch_q12;

Expand All @@ -98,4 +103,7 @@ drop materialized view tpch_q19;
statement ok
drop materialized view tpch_q20;

statement ok
drop materialized view tpch_q22;

include ../tpch/drop_tables.slt.part
12 changes: 10 additions & 2 deletions e2e_test/streaming/tpch_upstream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ include ./tpch/views/q7.slt.part
include ./tpch/views/q8.slt.part
include ./tpch/views/q9.slt.part
include ./tpch/views/q10.slt.part
# include ./tpch/views/q11.slt.part
include ./tpch/views/q11.slt.part
include ./tpch/views/q12.slt.part
include ./tpch/views/q13.slt.part
include ./tpch/views/q14.slt.part
include ./tpch/views/q17.slt.part
include ./tpch/views/q18.slt.part
include ./tpch/views/q19.slt.part
include ./tpch/views/q20.slt.part
include ./tpch/views/q22.slt.part

include ./tpch/q1.slt.part
include ./tpch/q2.slt.part
Expand All @@ -41,14 +42,15 @@ include ./tpch/q7.slt.part
include ./tpch/q8.slt.part
include ./tpch/q9.slt.part
include ./tpch/q10.slt.part
# include ./tpch/q11.slt.part
include ./tpch/q11.slt.part
include ./tpch/q12.slt.part
include ./tpch/q13.slt.part
include ./tpch/q14.slt.part
include ./tpch/q17.slt.part
include ./tpch/q18.slt.part
include ./tpch/q19.slt.part
include ./tpch/q20.slt.part
include ./tpch/q22.slt.part

statement ok
drop materialized view tpch_q1;
Expand Down Expand Up @@ -80,6 +82,9 @@ drop materialized view tpch_q9;
statement ok
drop materialized view tpch_q10;

statement ok
drop materialized view tpch_q11;

statement ok
drop materialized view tpch_q12;

Expand All @@ -101,4 +106,7 @@ drop materialized view tpch_q19;
statement ok
drop materialized view tpch_q20;

statement ok
drop materialized view tpch_q22;

include ../tpch/drop_tables.slt.part
15 changes: 15 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ message HashJoinNode {
repeated uint32 output_indices = 11;
}

message DynamicFilterNode {
uint32 left_key = 1;
// Must be one of <, <=, >, >=
expr.ExprNode condition = 3;
// Used for internal table states.
// catalog.Table left_table = 4;
// uint32 dist_key_l = 8;
// It is true when the right side of the inequality predicate is monotonically:
// - decreasing for <, <=, increasing for >, >=
// bool is_monotonic = 10;
// the output indices of current node
// repeated uint32 output_indices = 11;
}

// Delta join with two indexes. This is a pseudo plan node generated on frontend. On meta
// service, it will be rewritten into lookup joins.
message DeltaIndexJoinNode {
Expand Down Expand Up @@ -259,6 +273,7 @@ message StreamNode {
DeltaIndexJoinNode delta_index_join = 119;
SinkNode sink = 120;
ExpandNode expand = 121;
DynamicFilterNode dynamic_filter = 122;
}
// The id for the operator.
uint64 operator_id = 1;
Expand Down
9 changes: 8 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,8 +836,15 @@ impl ToStream for LogicalJoin {
Distribution::Single
);

let plan = StreamDynamicFilter::new(predicate.other_cond().clone(), left, right).into();
let plan = StreamDynamicFilter::new(
left_ref_index,
predicate.other_cond().clone(),
left,
right,
)
.into();

// TODO: `DynamicFilterExecutor` should use `output_indices` in `ChunkBuilder`
if self.output_indices != (0..self.internal_column_num()).collect::<Vec<_>>() {
let logical_project = LogicalProject::with_mapping(
plan,
Expand Down
16 changes: 13 additions & 3 deletions src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
use std::fmt;

use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;

use crate::expr::Expr;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, ToStreamProst};
use crate::optimizer::PlanRef;
use crate::utils::Condition;
Expand All @@ -25,12 +27,13 @@ pub struct StreamDynamicFilter {
/// The predicate (formed with exactly one of < , <=, >, >=)
predicate: Condition,
// dist_key_l: Distribution,
left_index: usize,
left: PlanRef,
right: PlanRef,
}

impl StreamDynamicFilter {
pub fn new(predicate: Condition, left: PlanRef, right: PlanRef) -> Self {
pub fn new(left_index: usize, predicate: Condition, left: PlanRef, right: PlanRef) -> Self {
// TODO: derive from input
let base = PlanBase::new_stream(
left.ctx(),
Expand All @@ -43,6 +46,7 @@ impl StreamDynamicFilter {
Self {
base,
predicate,
left_index,
left,
right,
}
Expand All @@ -65,14 +69,20 @@ impl PlanTreeNodeBinary for StreamDynamicFilter {
}

fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
Self::new(self.predicate.clone(), left, right)
Self::new(self.left_index, self.predicate.clone(), left, right)
}
}

impl_plan_tree_node_for_binary! { StreamDynamicFilter }

impl ToStreamProst for StreamDynamicFilter {
fn to_stream_prost_body(&self) -> NodeBody {
unimplemented!()
NodeBody::DynamicFilter(DynamicFilterNode {
left_key: self.left_index as u32,
condition: self
.predicate
.as_expr_unless_true()
.map(|x| x.to_expr_proto()),
})
}
}
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl ToStreamProst for StreamExchange {
r#type: match &self.base.dist {
Distribution::HashShard(_) => DispatcherType::Hash,
Distribution::Single => DispatcherType::Simple,
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
column_indices: match &self.base.dist {
Expand Down
Loading

0 comments on commit d767d99

Please sign in to comment.