Skip to content

Commit

Permalink
feat(frontend): apply SessionTimezone and ConstEvalRewriter expr …
Browse files Browse the repository at this point in the history
…rewriters to during `gen_{batch,stream}_plan` (#7761)

apply `SessionTimezone` and `ConstEvalRewriter` expr rewriters to during `gen_{batch,stream}_plan`


Notes:
- wait for #7757 to be merged
- wait for #7777 to be merged
- wait for #7786 to be merged

Approved-By: ice1000
Approved-By: st1page

Co-Authored-By: jon-chuang <jon-chuang@users.noreply.github.com>
Co-Authored-By: jon-chuang <9093549+jon-chuang@users.noreply.github.com>
  • Loading branch information
jon-chuang and jon-chuang authored Feb 23, 2023
1 parent 7cadc39 commit 88dc35e
Show file tree
Hide file tree
Showing 50 changed files with 515 additions and 553 deletions.
9 changes: 4 additions & 5 deletions e2e_test/batch/functions/now.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ statement ok
insert into t values(now());

# constant eval of now in batch plan
# query T
# explain select now() + interval '1 hour' = now() + interval '30 minutes' + interval '30 minutes' true;
# ----
# BatchProject { exprs: [true:Boolean] }
# └─BatchValues { rows: [[]] }
query T
explain select now() + interval '1 hour' = now() + interval '30 minutes' + interval '30 minutes' true;
----
BatchValues { rows: [[true:Boolean]] }

statement ok
drop table tz
Expand Down
37 changes: 37 additions & 0 deletions e2e_test/streaming/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,40 @@ drop table t9

statement ok
drop table t10

statement ok
flush;

# Regression test for: https://github.com/risingwavelabs/risingwave/issues/8114
statement ok
create table t1 (uid int, item_id int, event string, name string);

statement ok
create table t2 (uid int, name string);

statement ok
create materialized view v as SELECT event, t1.name FROM t1 INNER JOIN t2 WHERE t1.name=t2.name AND t1.event=concat('event_', array_join(array[t2.uid, t1.item_id], '_'));

statement ok
insert into t1 values (0, 0, 'event_0_0', 'a'), (1, NULL, 'event_1', 'b'), (2, 3, 'event_2_1', 'c');

statement ok
insert into t2 values (0, 'a'), (1, 'b'), (2, 'c');

statement ok
flush;

query I rowsort
select * from v;
----
event_0_0 a
event_1 b

statement ok
drop materialized view v;

statement ok
drop table t1

statement ok
drop table t2
8 changes: 6 additions & 2 deletions src/expr/src/expr/expr_array_to_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,12 @@ impl Expression for ArrayToStringExpression {
let array = self.array.eval_row(input)?;
let delimiter = self.delimiter.eval_row(input)?;

let result = if let Some(array) = array && let Some(delimiter) = delimiter && let Some(e) = &self.null_string {
let null_string = e.eval_row(input)?;
let result = if let Some(array) = array && let Some(delimiter) = delimiter {
let null_string = if let Some(e) = &self.null_string {
e.eval_row(input)?
} else {
None
};
let mut writer = String::new();
if let Some(null_string) = null_string {
self.evaluate_with_nulls(array.as_scalar_ref_impl().into_list(), delimiter.as_utf8(), null_string.as_utf8(), &mut writer);
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1083,21 +1083,21 @@
└─LogicalProject { exprs: [t.v1, (t.v1 * t.v1) as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
batch_plan: |
BatchProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal::Float64, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr3] }
BatchProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Float64, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr3] }
└─BatchSimpleAgg { aggs: [sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1))] }
└─BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1), sum($expr1), sum(t.v1), count(t.v1)] }
└─BatchProject { exprs: [t.v1, (t.v1 * t.v1) as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
batch_local_plan: |
BatchProject { exprs: [Case((count(t.v1) <= 1:Int64), null:Decimal::Float64, Pow(((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1))) / (count(t.v1) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1))) / count(t.v1))::Float64, 0.5:Float64) as $expr3] }
BatchProject { exprs: [Case((count(t.v1) <= 1:Int64), null:Float64, Pow(((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1))) / (count(t.v1) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum($expr1)::Decimal - ((sum(t.v1)::Decimal * sum(t.v1)::Decimal) / count(t.v1))) / count(t.v1))::Float64, 0.5:Float64) as $expr3] }
└─BatchSimpleAgg { aggs: [sum($expr1), sum(t.v1), count(t.v1), sum($expr1), sum(t.v1), count(t.v1)] }
└─BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [t.v1, (t.v1 * t.v1) as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [stddev_samp, stddev_pop], pk_columns: [] }
└─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Decimal::Float64, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr3] }
└─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Float64, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr3] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamExchange { dist: Single }
└─StreamStatelessLocalSimpleAgg { aggs: [count, sum($expr1), sum(t.v1), count(t.v1), sum($expr1), sum(t.v1), count(t.v1)] }
Expand Down
14 changes: 7 additions & 7 deletions src/frontend/planner_test/tests/testdata/array.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
logical_plan: |
LogicalValues { rows: [[Array('foo':Varchar, 'bar':Varchar)]], schema: Schema { fields: [*VALUES*_0.column_0:List { datatype: Varchar }] } }
batch_plan: |
BatchValues { rows: [[Array('foo':Varchar, 'bar':Varchar)]] }
BatchValues { rows: [[ARRAY[foo, bar]:List { datatype: Varchar }]] }
- sql: |
values (ARRAY[1, 2+3, 4*5+1]);
logical_plan: |
LogicalValues { rows: [[Array(1:Int32, (2:Int32 + 3:Int32), ((4:Int32 * 5:Int32) + 1:Int32))]], schema: Schema { fields: [*VALUES*_0.column_0:List { datatype: Int32 }] } }
batch_plan: |
BatchValues { rows: [[Array(1:Int32, (2:Int32 + 3:Int32), ((4:Int32 * 5:Int32) + 1:Int32))]] }
BatchValues { rows: [[ARRAY[1, 5, 21]:List { datatype: Int32 }]] }
- sql: |
create table t (v1 int);
select (ARRAY[1, v1]) from t;
Expand Down Expand Up @@ -52,21 +52,21 @@
LogicalProject { exprs: [ArrayCat(Array(66:Int32), Array(123:Int32)) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchValues { rows: [[ArrayCat(Array(66:Int32), Array(123:Int32))]] }
BatchValues { rows: [[ARRAY[66, 123]:List { datatype: Int32 }]] }
- sql: |
select array_cat(array[array[66]], array[233]);
logical_plan: |
LogicalProject { exprs: [ArrayCat(Array(Array(66:Int32)), Array(233:Int32)) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchValues { rows: [[ArrayCat(Array(Array(66:Int32)), Array(233:Int32))]] }
BatchValues { rows: [[ARRAY[{66}, {233}]:List { datatype: List { datatype: Int32 } }]] }
- sql: |
select array_cat(array[233], array[array[66]]);
logical_plan: |
LogicalProject { exprs: [ArrayCat(Array(233:Int32), Array(Array(66:Int32))) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchValues { rows: [[ArrayCat(Array(233:Int32), Array(Array(66:Int32)))]] }
BatchValues { rows: [[ARRAY[{233}, {66}]:List { datatype: List { datatype: Int32 } }]] }
- sql: |
select array_cat(array[233], array[array[array[66]]]);
binder_error: 'Bind error: unable to find least restrictive type between integer[] and integer[][][]'
Expand All @@ -82,7 +82,7 @@
LogicalProject { exprs: [ArrayAppend(Array(66:Int32), 123:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchValues { rows: [[ArrayAppend(Array(66:Int32), 123:Int32)]] }
BatchValues { rows: [[ARRAY[66, 123]:List { datatype: Int32 }]] }
- sql: |
select array_append(123, 234);
binder_error: 'Bind error: Cannot append integer to integer'
Expand All @@ -98,7 +98,7 @@
LogicalProject { exprs: [ArrayPrepend(123:Int32, Array(66:Int32)) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |
BatchValues { rows: [[ArrayPrepend(123:Int32, Array(66:Int32))]] }
BatchValues { rows: [[ARRAY[123, 66]:List { datatype: Int32 }]] }
- sql: |
select array_prepend(123, 234);
binder_error: 'Bind error: Cannot prepend integer to integer'
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/planner_test/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- sql: values (11, 22), (33+(1+2), 44);
batch_plan: |
BatchValues { rows: [[11:Int32, 22:Int32], [(33:Int32 + (1:Int32 + 2:Int32)), 44:Int32]] }
BatchValues { rows: [[11:Int32, 22:Int32], [36:Int32, 44:Int32]] }
- sql: select * from t
binder_error: 'Catalog error: table or source not found: t'
- sql: |
Expand All @@ -22,11 +22,11 @@
select * from t where 1>2 and 1=1 and 3<1 and 4<>1 or 1=1 and 2>=1 and 1<=2;
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: (1:Int32 = 1:Int32) AND ((((1:Int32 > 2:Int32) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR ((2:Int32 >= 1:Int32) AND (1:Int32 <= 2:Int32))) }
└─BatchFilter { predicate: true:Boolean AND true:Boolean }
└─BatchScan { table: t, columns: [], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [t._row_id(hidden)], pk_columns: [t._row_id] }
└─StreamFilter { predicate: (1:Int32 = 1:Int32) AND ((((1:Int32 > 2:Int32) AND (3:Int32 < 1:Int32)) AND (4:Int32 <> 1:Int32)) OR ((2:Int32 >= 1:Int32) AND (1:Int32 <= 2:Int32))) }
└─StreamFilter { predicate: true:Boolean AND true:Boolean }
└─StreamTableScan { table: t, columns: [t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- sql: |
create table t (v1 int);
Expand Down Expand Up @@ -129,11 +129,11 @@
- sql: |
select * from unnest(Array[1,2,3]);
batch_plan: |
BatchTableFunction { Unnest(Array(1:Int32, 2:Int32, 3:Int32)) }
BatchTableFunction { Unnest(ARRAY[1, 2, 3]:List { datatype: Int32 }) }
- sql: |
select * from unnest(Array[Array[1,2,3], Array[4,5,6]]);
batch_plan: |
BatchTableFunction { Unnest(Array(Array(1:Int32, 2:Int32, 3:Int32), Array(4:Int32, 5:Int32, 6:Int32))) }
BatchTableFunction { Unnest(ARRAY[{1,2,3}, {4,5,6}]:List { datatype: List { datatype: Int32 } }) }
- sql: |
create table t1 (x int);
select * from t1 where EXISTS(select * where t1.x=1);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/cast.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
sql: |
select case when NULL then 1 end;
batch_plan: |
BatchValues { rows: [[Case(null:Boolean, 1:Int32)]] }
BatchValues { rows: [[null:Int32]] }
- name: implicit cast boolean (JOIN ON NULL)
sql: |
create table t1(v1 int);
Expand Down Expand Up @@ -60,7 +60,7 @@
sql: |
select case when 'y' then 1 end;
batch_plan: |
BatchValues { rows: [[Case(true:Boolean, 1:Int32)]] }
BatchValues { rows: [[1:Int32]] }
- name: implicit cast boolean (JOIN ON with literal 'y' of unknown type)
sql: |
create table t1(v1 int);
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/planner_test/tests/testdata/explain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@
LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
Inline Session Timezone:
BatchValues { rows: [[1:Int32]] }
Const eval exprs:
BatchValues { rows: [[1:Int32]] }
To Batch Physical Plan:
BatchValues { rows: [[1:Int32]] }
Expand All @@ -55,7 +63,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 30,
"plan_node_id": 34,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down
Loading

0 comments on commit 88dc35e

Please sign in to comment.