Skip to content

Commit

Permalink
Update stream_plan in watermark.yaml for tests. Delete StreamExchange…
Browse files Browse the repository at this point in the history
…, modify Stream nodes, add StreamAppendOnlyHashAgg.
  • Loading branch information
shanicky committed Mar 15, 2023
1 parent 223a121 commit 200b454
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 47 deletions.
30 changes: 15 additions & 15 deletions src/frontend/planner_test/tests/testdata/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,35 @@
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Int64(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Int64, 0:Int32], pk_conflict: "no check" }
StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: "no check" }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t1._row_id, null:Int64, 0:Int32) }
| └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Int64, 0:Int32] }
├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) }
| └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] }
| └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(null:Int64, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int64, t2._row_id, 1:Int32] }
└─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] }
└─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Int64(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Int64, 0:Int32], pk_conflict: "no check" }
StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: "no check" }
materialized table: 4294967294
StreamUnion { all: true }
StreamExchange Hash([3, 4, 5]) from 1
StreamExchange Hash([3, 4, 5]) from 2
Fragment 1
StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Int64, 0:Int32] }
StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] }
Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
Fragment 2
StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int64, t2._row_id, 1:Int32] }
StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] }
Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
Upstream
BatchPlanNode
Table 4294967294 { columns: [a, b, c, t1._row_id, null:Int64, 0:Int32], primary key: [$3 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3, 4, 5], read pk prefix len hint: 3 }
Table 4294967294 { columns: [a, b, c, t1._row_id, null:Serial, 0:Int32], primary key: [$3 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [3, 4, 5], read pk prefix len hint: 3 }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand All @@ -63,11 +63,11 @@
└─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] }
└─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t1._row_id, null:Int64, 0:Int32) }
| └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Int64, 0:Int32] }
├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) }
| └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] }
| └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(null:Int64, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int64, t2._row_id, 1:Int32] }
└─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] }
└─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
stream_dist_plan: |
Fragment 0
Expand All @@ -84,13 +84,13 @@
StreamExchange Hash([3, 4, 5]) from 3
Fragment 2
StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Int64, 0:Int32] }
StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] }
Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode
Fragment 3
StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int64, t2._row_id, 1:Int32] }
StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] }
Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
Upstream
BatchPlanNode
Expand Down
58 changes: 26 additions & 32 deletions src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,19 @@
└─LogicalSource { source: t, columns: [v1, _row_id], time_range: [(Unbounded, Unbounded)] }
stream_plan: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check", watermark_columns: [v1] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamProject { exprs: [(AtTimeZone((AtTimeZone(v1, 'UTC':Varchar) - '00:00:00':Interval), 'UTC':Varchar) - '00:00:02':Interval) as $expr1, _row_id], output_watermarks: [$expr1] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
└─StreamProject { exprs: [(AtTimeZone((AtTimeZone(v1, 'UTC':Varchar) - '00:00:00':Interval), 'UTC':Varchar) - '00:00:02':Interval) as $expr1, _row_id], output_watermarks: [$expr1] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check", watermark_columns: [v1] }
materialized table: 4294967294
StreamExchange Hash([1]) from 1
Fragment 1
StreamProject { exprs: [(AtTimeZone((AtTimeZone(v1, 'UTC':Varchar) - '00:00:00':Interval), 'UTC':Varchar) - '00:00:02':Interval) as $expr1, _row_id], output_watermarks: [$expr1] }
StreamRowIdGen { row_id_index: 1 }
StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
StreamSource { source: "t", columns: ["v1", "_row_id"] }
source state table: 1
StreamProject { exprs: [(AtTimeZone((AtTimeZone(v1, 'UTC':Varchar) - '00:00:00':Interval), 'UTC':Varchar) - '00:00:02':Interval) as $expr1, _row_id], output_watermarks: [$expr1] }
StreamRowIdGen { row_id_index: 1 }
StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
StreamSource { source: "t", columns: ["v1", "_row_id"] }
source state table: 1
Table 1 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [], read pk prefix len hint: 0 }
Table 4294967294 { columns: [v1, _row_id], primary key: [$1 ASC], value indices: [0, 1], distribution key: [1], read pk prefix len hint: 1 }
Expand All @@ -33,21 +29,19 @@
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest') ROW FORMAT JSON;
explain_output: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check", watermark_columns: [v1] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource { source: "t", columns: ["v1", "_row_id"] }
- name: watermark on append only table without source
sql: |
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) append only;
explain_output: |
StreamMaterialize { columns: [v1, _row_id(hidden)], pk_columns: [_row_id], pk_conflict: "no check", watermark_columns: [v1] }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource
└─StreamRowIdGen { row_id_index: 1 }
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] }
└─StreamDml { columns: [v1, _row_id] }
└─StreamSource
- name: hash agg
sql: |
create table t (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
Expand Down Expand Up @@ -76,13 +70,13 @@
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
select * from t1 Union all select * from t2;
stream_plan: |
StreamMaterialize { columns: [ts, v1, v2, t1._row_id(hidden), null:Int64(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Int64, 0:Int32], pk_conflict: "no check", watermark_columns: [ts] }
StreamMaterialize { columns: [ts, v1, v2, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: "no check", watermark_columns: [ts] }
└─StreamUnion { all: true, output_watermarks: [t1.ts] }
├─StreamExchange { dist: HashShard(t1._row_id, null:Int64, 0:Int32) }
| └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Int64, 0:Int32], output_watermarks: [t1.ts] }
├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) }
| └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Serial, 0:Int32], output_watermarks: [t1.ts] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(null:Int64, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Int64, t2._row_id, 1:Int32], output_watermarks: [t2.ts] }
└─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Serial, t2._row_id, 1:Int32], output_watermarks: [t2.ts] }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: union
sql: |
Expand All @@ -95,11 +89,11 @@
└─StreamAppendOnlyHashAgg { group_key: [t1.ts, t1.v1, t1.v2], aggs: [count], output_watermarks: [t1.ts] }
└─StreamExchange { dist: HashShard(t1.ts, t1.v1, t1.v2) }
└─StreamUnion { all: true, output_watermarks: [t1.ts] }
├─StreamExchange { dist: HashShard(t1._row_id, null:Int64, 0:Int32) }
| └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Int64, 0:Int32], output_watermarks: [t1.ts] }
├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) }
| └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Serial, 0:Int32], output_watermarks: [t1.ts] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(null:Int64, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Int64, t2._row_id, 1:Int32], output_watermarks: [t2.ts] }
└─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) }
└─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Serial, t2._row_id, 1:Int32], output_watermarks: [t2.ts] }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: tumble
sql: |
Expand Down

0 comments on commit 200b454

Please sign in to comment.