Skip to content

Commit

Permalink
feat(explain): add conflict behavior in explain materialize operator (#…
Browse files Browse the repository at this point in the history
…8138)

as title

Approved-By: BugenZhao
  • Loading branch information
st1page authored Feb 23, 2023
1 parent 52a39fd commit f2199fe
Show file tree
Hide file tree
Showing 31 changed files with 367 additions and 360 deletions.
54 changes: 27 additions & 27 deletions src/frontend/planner_test/tests/testdata/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v1, agg], pk_columns: [v1] }
StreamMaterialize { columns: [v1, agg], pk_columns: [v1], pk_conflict: "no check" }
└─StreamProject { exprs: [t.v1, (min(t.v2) + (max(t.v3) * count(t.v1))) as $expr1] }
└─StreamHashAgg { group_key: [t.v1], aggs: [count, min(t.v2), max(t.v3), count(t.v1)] }
└─StreamExchange { dist: HashShard(t.v1) }
Expand All @@ -50,7 +50,7 @@
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [agg], pk_columns: [] }
StreamMaterialize { columns: [agg], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v2)) * sum0(count(t.v3)))) as $expr2] }
└─StreamGlobalSimpleAgg { aggs: [count, min(min(t.v1)), max(max(t.v2)), sum0(count(t.v3))] }
└─StreamExchange { dist: Single }
Expand Down Expand Up @@ -82,7 +82,7 @@
└─BatchProject { exprs: [t.v3, t.v1, (t.v1 + t.v2) as $expr1] }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v3, agg], pk_columns: [v3] }
StreamMaterialize { columns: [v3, agg], pk_columns: [v3], pk_conflict: "no check" }
└─StreamProject { exprs: [t.v3, (min(t.v1) * (sum($expr1)::Decimal / count($expr1))) as $expr2] }
└─StreamHashAgg { group_key: [t.v3], aggs: [count, min(t.v1), sum($expr1), count($expr1)] }
└─StreamExchange { dist: HashShard(t.v3) }
Expand Down Expand Up @@ -150,7 +150,7 @@
└─BatchProject { exprs: [(t.v1 + t.v2) as $expr1] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [cnt, sum], pk_columns: [] }
StreamMaterialize { columns: [cnt, sum], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum0(count($expr1)), sum(sum($expr1))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum0(count($expr1)), sum(sum($expr1))] }
└─StreamExchange { dist: Single }
Expand All @@ -168,7 +168,7 @@
└─BatchProject { exprs: [t.v1, (t.v2 + t.v3) as $expr1] }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [v1, agg], pk_columns: [v1] }
StreamMaterialize { columns: [v1, agg], pk_columns: [v1], pk_conflict: "no check" }
└─StreamProject { exprs: [t.v1, ((sum($expr1) / count($expr1)) + max(t.v1)) as $expr2] }
└─StreamHashAgg { group_key: [t.v1], aggs: [count, sum($expr1), count($expr1), max(t.v1)] }
└─StreamExchange { dist: HashShard(t.v1) }
Expand Down Expand Up @@ -420,7 +420,7 @@
└─BatchSimpleAgg { aggs: [min(t.v1), max(t.v3), count(t.v2)] }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [agg], pk_columns: [] }
StreamMaterialize { columns: [agg], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [(min(min(t.v1)) + (max(max(t.v3)) * sum0(count(t.v2)))) as $expr2] }
└─StreamGlobalSimpleAgg { aggs: [count, min(min(t.v1)), max(max(t.v3)), sum0(count(t.v2))] }
└─StreamExchange { dist: Single }
Expand All @@ -442,7 +442,7 @@
└─LogicalAgg { group_key: [t.v1], aggs: [] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [v1, t.v1(hidden)], pk_columns: [t.v1] }
StreamMaterialize { columns: [v1, t.v1(hidden)], pk_columns: [t.v1], pk_conflict: "no check" }
└─StreamProject { exprs: [t.v1, t.v1] }
└─StreamHashAgg { group_key: [t.v1], aggs: [count] }
└─StreamExchange { dist: HashShard(t.v1) }
Expand All @@ -462,7 +462,7 @@
└─LogicalAgg { group_key: [t.v3, t.v2], aggs: [min(t.v1), max(t.v1)] }
└─LogicalScan { table: t, columns: [t.v1, t.v2, t.v3] }
stream_plan: |
StreamMaterialize { columns: [v2, min_v1, v3, max_v1, t.v2(hidden)], pk_columns: [v3, t.v2] }
StreamMaterialize { columns: [v2, min_v1, v3, max_v1, t.v2(hidden)], pk_columns: [v3, t.v2], pk_conflict: "no check" }
└─StreamProject { exprs: [t.v2, min(t.v1), t.v3, max(t.v1), t.v2] }
└─StreamHashAgg { group_key: [t.v3, t.v2], aggs: [count, min(t.v1), max(t.v1)] }
└─StreamExchange { dist: HashShard(t.v2, t.v3) }
Expand All @@ -480,7 +480,7 @@
LogicalAgg { aggs: [sum(t.v1)] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
StreamMaterialize { columns: [s1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v1))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v1))] }
└─StreamExchange { dist: Single }
Expand All @@ -499,7 +499,7 @@
LogicalAgg { aggs: [sum(t.v1)] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
StreamMaterialize { columns: [s1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v1))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v1))] }
└─StreamExchange { dist: Single }
Expand All @@ -518,7 +518,7 @@
LogicalAgg { aggs: [sum(t.v1)] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
StreamMaterialize { columns: [s1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v1))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v1))] }
└─StreamExchange { dist: Single }
Expand All @@ -537,7 +537,7 @@
LogicalAgg { aggs: [sum(t.v1)] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
StreamMaterialize { columns: [s1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v1))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v1))] }
└─StreamExchange { dist: Single }
Expand All @@ -556,7 +556,7 @@
LogicalAgg { aggs: [sum(t.v1)] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
StreamMaterialize { columns: [s1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v1))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v1))] }
└─StreamExchange { dist: Single }
Expand All @@ -575,7 +575,7 @@
LogicalAgg { aggs: [sum(t.v1) filter((t.v1 > 0:Int32))] }
└─LogicalScan { table: t, columns: [t.v1] }
stream_plan: |
StreamMaterialize { columns: [sa], pk_columns: [] }
StreamMaterialize { columns: [sa], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v1) filter((t.v1 > 0:Int32)))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v1) filter((t.v1 > 0:Int32)))] }
└─StreamExchange { dist: Single }
Expand Down Expand Up @@ -610,7 +610,7 @@
└─LogicalProject { exprs: [t.a, t.b, (t.a * t.b) as $expr1] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [sab], pk_columns: [] }
StreamMaterialize { columns: [sab], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] }
└─StreamGlobalSimpleAgg { aggs: [count, max(max($expr1) filter((t.a < t.b) AND ((t.a + t.b) < 100:Int32) AND ((t.a * t.b) <> ((t.a + t.b) - 1:Int32))))] }
└─StreamExchange { dist: Single }
Expand All @@ -632,7 +632,7 @@
└─LogicalAgg { group_key: [t.b], aggs: [sum(t.a) filter((t.a > t.b)), count(t.a) filter((t.a > t.b))] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [avga, t.b(hidden)], pk_columns: [t.b] }
StreamMaterialize { columns: [avga, t.b(hidden)], pk_columns: [t.b], pk_conflict: "no check" }
└─StreamProject { exprs: [(sum(t.a) filter((t.a > t.b))::Decimal / count(t.a) filter((t.a > t.b))) as $expr1, t.b] }
└─StreamHashAgg { group_key: [t.b], aggs: [count, sum(t.a) filter((t.a > t.b)), count(t.a) filter((t.a > t.b))] }
└─StreamExchange { dist: HashShard(t.b) }
Expand All @@ -650,7 +650,7 @@
LogicalAgg { aggs: [count filter((t.a > t.b))] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [cnt_agb], pk_columns: [] }
StreamMaterialize { columns: [cnt_agb], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum0(count filter((t.a > t.b)))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum0(count filter((t.a > t.b)))] }
└─StreamExchange { dist: Single }
Expand Down Expand Up @@ -690,7 +690,7 @@
└─BatchSimpleAgg { aggs: [sum(t.v2) filter((t.v2 < 5:Int32))] }
└─BatchScan { table: t, columns: [t.v2], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [b], pk_columns: [] }
StreamMaterialize { columns: [b], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum(t.v2) filter((t.v2 < 5:Int32)))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum(t.v2) filter((t.v2 < 5:Int32)))] }
└─StreamExchange { dist: Single }
Expand All @@ -711,7 +711,7 @@
└─BatchExchange { order: [], dist: HashShard(t.v1, t.v2, t.v3) }
└─BatchScan { table: t, columns: [t.v1, t.v2, t.v3], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [min, sum, t.v1(hidden), t.v3(hidden), t.v2(hidden)], pk_columns: [t.v1, t.v3, t.v2] }
StreamMaterialize { columns: [min, sum, t.v1(hidden), t.v3(hidden), t.v2(hidden)], pk_columns: [t.v1, t.v3, t.v2], pk_conflict: "no check" }
└─StreamProject { exprs: [min(min(t.v3)), sum(sum(t.v1)), t.v1, t.v3, t.v2] }
└─StreamHashAgg { group_key: [t.v1, t.v3, t.v2], aggs: [count, min(min(t.v3)), sum(sum(t.v1))] }
└─StreamExchange { dist: HashShard(t.v1, t.v3, t.v2) }
Expand All @@ -730,7 +730,7 @@
└─BatchSimpleAgg { aggs: [min(t.v1), sum(t.v2)] }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [min, sum], pk_columns: [] }
StreamMaterialize { columns: [min, sum], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [min(min(t.v1)), sum(sum(t.v2))] }
└─StreamGlobalSimpleAgg { aggs: [count, min(min(t.v1)), sum(sum(t.v2))] }
└─StreamExchange { dist: Single }
Expand All @@ -748,7 +748,7 @@
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [t.v1, t.v2], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [min, sum], pk_columns: [] }
StreamMaterialize { columns: [min, sum], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [min(t.v1), sum(t.v2)] }
└─StreamGlobalSimpleAgg { aggs: [count, min(t.v1), sum(t.v2)] }
└─StreamExchange { dist: Single }
Expand Down Expand Up @@ -778,7 +778,7 @@
└─BatchExchange { order: [], dist: HashShard(t.a, t.b) }
└─BatchScan { table: t, columns: [t.a, t.b, t.c], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, distinct_b_num, sum_c], pk_columns: [a] }
StreamMaterialize { columns: [a, distinct_b_num, sum_c], pk_columns: [a], pk_conflict: "no check" }
└─StreamProject { exprs: [t.a, count(t.b), sum(sum(t.c))] }
└─StreamHashAgg { group_key: [t.a], aggs: [count, count(t.b), sum(sum(t.c))] }
└─StreamExchange { dist: HashShard(t.a) }
Expand All @@ -804,7 +804,7 @@
└─BatchExpand { column_subsets: [[t.a, t.c], [t.a, t.b]] }
└─BatchScan { table: t, columns: [t.a, t.b, t.c], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, distinct_b_num, distinct_c_sum, sum_c], pk_columns: [a] }
StreamMaterialize { columns: [a, distinct_b_num, distinct_c_sum, sum_c], pk_columns: [a], pk_conflict: "no check" }
└─StreamProject { exprs: [t.a, count(t.b) filter((flag = 1:Int64)), count(t.c) filter((flag = 0:Int64)), sum(sum(t.c)) filter((flag = 0:Int64))] }
└─StreamHashAgg { group_key: [t.a], aggs: [count, count(t.b) filter((flag = 1:Int64)), count(t.c) filter((flag = 0:Int64)), sum(sum(t.c)) filter((flag = 0:Int64))] }
└─StreamExchange { dist: HashShard(t.a) }
Expand All @@ -829,7 +829,7 @@
└─BatchExchange { order: [], dist: HashShard(t.a, t.b) }
└─BatchScan { table: t, columns: [t.a, t.b, t.c], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, count, sum], pk_columns: [a] }
StreamMaterialize { columns: [a, count, sum], pk_columns: [a], pk_conflict: "no check" }
└─StreamProject { exprs: [t.a, count(t.b) filter((count filter((t.b < 100:Int32)) > 0:Int64)), sum(sum(t.c))] }
└─StreamHashAgg { group_key: [t.a], aggs: [count, count(t.b) filter((count filter((t.b < 100:Int32)) > 0:Int64)), sum(sum(t.c))] }
└─StreamExchange { dist: HashShard(t.a) }
Expand Down Expand Up @@ -859,7 +859,7 @@
└─LogicalProject { exprs: [t.b, (Length(t.a) * t.b) as $expr1] }
└─LogicalScan { table: t, columns: [t.a, t.b] }
stream_plan: |
StreamMaterialize { columns: [s1], pk_columns: [] }
StreamMaterialize { columns: [s1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr1) filter((t.b < 100:Int32) AND ((t.b * 2:Int32) > 10:Int32)))] }
└─StreamExchange { dist: Single }
Expand Down Expand Up @@ -888,7 +888,7 @@
└─BatchSortAgg { group_key: [i.x], aggs: [count] }
└─BatchScan { table: i, columns: [i.x], distribution: UpstreamHashShard(i.x) }
stream_plan: |
StreamMaterialize { columns: [cnt, i.x(hidden)], pk_columns: [i.x] }
StreamMaterialize { columns: [cnt, i.x(hidden)], pk_columns: [i.x], pk_conflict: "no check" }
└─StreamProject { exprs: [count, i.x] }
└─StreamHashAgg { group_key: [i.x], aggs: [count, count] }
└─StreamTableScan { table: i, columns: [i.x, i.t._row_id], pk: [i.t._row_id], dist: UpstreamHashShard(i.x) }
Expand Down Expand Up @@ -1096,7 +1096,7 @@
└─BatchProject { exprs: [t.v1, (t.v1 * t.v1) as $expr1] }
└─BatchScan { table: t, columns: [t.v1], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [stddev_samp, stddev_pop], pk_columns: [] }
StreamMaterialize { columns: [stddev_samp, stddev_pop], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [Case((sum0(count(t.v1)) <= 1:Int64), null:Float64, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / (sum0(count(t.v1)) - 1:Int64))::Float64, 0.5:Float64)) as $expr2, Pow(((sum(sum($expr1))::Decimal - ((sum(sum(t.v1))::Decimal * sum(sum(t.v1))::Decimal) / sum0(count(t.v1)))) / sum0(count(t.v1)))::Float64, 0.5:Float64) as $expr3] }
└─StreamGlobalSimpleAgg { aggs: [count, sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1)), sum(sum($expr1)), sum(sum(t.v1)), sum0(count(t.v1))] }
└─StreamExchange { dist: Single }
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/append_only.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
create table t1 (v1 int, v2 int) with (appendonly = true);
select v1, max(v2) as mx2 from t1 group by v1;
stream_plan: |
StreamMaterialize { columns: [v1, mx2], pk_columns: [v1] }
StreamMaterialize { columns: [v1, mx2], pk_columns: [v1], pk_conflict: "no check" }
└─StreamProject { exprs: [t1.v1, max(t1.v2)] }
└─StreamAppendOnlyHashAgg { group_key: [t1.v1], aggs: [count, max(t1.v2)] }
└─StreamExchange { dist: HashShard(t1.v1) }
Expand All @@ -13,7 +13,7 @@
create table t2 (v1 int, v3 int) with (appendonly = true);
select t1.v1 as id, v2, v3 from t1 join t2 on t1.v1=t2.v1;
stream_plan: |
StreamMaterialize { columns: [id, v2, v3, t1._row_id(hidden), t2._row_id(hidden), t2.v1(hidden)], pk_columns: [t1._row_id, t2._row_id, id, t2.v1] }
StreamMaterialize { columns: [id, v2, v3, t1._row_id(hidden), t2._row_id(hidden), t2.v1(hidden)], pk_columns: [t1._row_id, t2._row_id, id, t2.v1], pk_conflict: "no check" }
└─StreamAppendOnlyHashJoin { type: Inner, predicate: t1.v1 = t2.v1, output: [t1.v1, t1.v2, t2.v3, t1._row_id, t2._row_id, t2.v1] }
├─StreamExchange { dist: HashShard(t1.v1) }
| └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Expand All @@ -23,15 +23,15 @@
create table t1 (v1 int, v2 int) with (appendonly = true);
select v1 from t1 order by v1 limit 3 offset 3;
stream_plan: |
StreamMaterialize { columns: [v1, t1._row_id(hidden)], pk_columns: [t1._row_id], order_descs: [v1, t1._row_id] }
StreamMaterialize { columns: [v1, t1._row_id(hidden)], pk_columns: [t1._row_id], order_descs: [v1, t1._row_id], pk_conflict: "no check" }
└─StreamAppendOnlyTopN { order: "[t1.v1 ASC]", limit: 3, offset: 3 }
└─StreamExchange { dist: Single }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
- sql: |
create table t1 (v1 int, v2 int) with (appendonly = true);
select max(v1) as max_v1 from t1;
stream_plan: |
StreamMaterialize { columns: [max_v1], pk_columns: [] }
StreamMaterialize { columns: [max_v1], pk_columns: [], pk_conflict: "no check" }
└─StreamProject { exprs: [max(max(t1.v1))] }
└─StreamAppendOnlyGlobalSimpleAgg { aggs: [count, max(max(t1.v1))] }
└─StreamExchange { dist: Single }
Expand Down
Loading

0 comments on commit f2199fe

Please sign in to comment.