forked from risingwavelabs/risingwave
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(streaming): support temporal join part 3 (risingwavelabs#8480)
Co-authored-by: Bugen Zhao <i@bugenzhao.com> Co-authored-by: st1page <1245835950@qq.com>
- Loading branch information
1 parent
f36bf0b
commit 2db01f9
Showing
27 changed files
with
456 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
statement ok | ||
SET RW_IMPLICIT_FLUSH TO true; | ||
|
||
statement ok | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
|
||
statement ok | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
|
||
statement ok | ||
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 | ||
|
||
statement ok | ||
insert into stream values(1, 11, 111); | ||
|
||
statement ok | ||
insert into version values(1, 11, 111); | ||
|
||
statement ok | ||
insert into stream values(1, 11, 111); | ||
|
||
statement ok | ||
delete from version; | ||
|
||
query IIII rowsort | ||
select * from v; | ||
---- | ||
1 11 1 11 | ||
1 11 NULL NULL | ||
|
||
statement ok | ||
insert into version values(2, 22, 222); | ||
|
||
statement ok | ||
insert into stream values(2, 22, 222); | ||
|
||
query IIII rowsort | ||
select * from v; | ||
---- | ||
1 11 1 11 | ||
1 11 NULL NULL | ||
2 22 2 22 | ||
|
||
statement ok | ||
drop materialized view v; | ||
|
||
statement ok | ||
create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 | ||
|
||
query IIII rowsort | ||
select * from v; | ||
---- | ||
1 11 NULL NULL | ||
1 11 NULL NULL | ||
2 22 2 22 | ||
|
||
statement ok | ||
drop materialized view v; | ||
|
||
statement ok | ||
drop table stream; | ||
|
||
statement ok | ||
drop table version; |
154 changes: 154 additions & 0 deletions
154
src/frontend/planner_test/tests/testdata/temporal_join.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. | ||
- name: Left join type for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1= id2 | ||
stream_plan: | | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } | ||
├─StreamExchange { dist: HashShard(stream.id1) } | ||
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } | ||
batch_error: |- | ||
Not supported: do not support temporal join for batch queries | ||
HINT: please use temporal join in streaming queries | ||
- name: Inner join type for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; | ||
stream_plan: | | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } | ||
├─StreamExchange { dist: HashShard(stream.id1) } | ||
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } | ||
- name: implicit join with temporal tables | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream, version FOR SYSTEM_TIME AS OF NOW() where id1 = id2 AND a2 < 10; | ||
stream_plan: | | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, id1], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } | ||
├─StreamExchange { dist: HashShard(stream.id1) } | ||
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } | ||
- name: Multi join key for temporal join | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2, a2)); | ||
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on a1 = a2 and id1 = id2 where b2 != a2; | ||
stream_plan: | | ||
StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], pk_columns: [stream._row_id, id2, a2, id1, a1], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } | ||
├─StreamExchange { dist: HashShard(stream.id1, stream.a1) } | ||
| └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2, version.a2) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2], pk: [version.id2, version.a2], dist: UpstreamHashShard(version.id2, version.a2) } | ||
- name: Temporal join with Aggregation | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select count(*) from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; | ||
stream_plan: | | ||
StreamMaterialize { columns: [count], pk_columns: [], pk_conflict: "no check" } | ||
└─StreamProject { exprs: [sum0(count)] } | ||
└─StreamAppendOnlyGlobalSimpleAgg { aggs: [sum0(count), count] } | ||
└─StreamExchange { dist: Single } | ||
└─StreamStatelessLocalSimpleAgg { aggs: [count] } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } | ||
├─StreamExchange { dist: HashShard(stream.id1) } | ||
| └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version.id2) } | ||
└─StreamTableScan { table: version, columns: [version.id2, version.a2], pk: [version.id2], dist: UpstreamHashShard(version.id2) } | ||
- name: Temporal join join keys requirement test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int) APPEND ONLY; | ||
create table version(id2 int, a2 int, b2 int, primary key (id2, a2)); | ||
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; | ||
stream_error: |- | ||
Not supported: Temporal join requires the lookup table's primary key contained exactly in the equivalence condition | ||
HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions | ||
- name: Temporal join append only test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int); | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; | ||
stream_error: |- | ||
Not supported: Temporal join requires a append-only left input | ||
HINT: Please ensure your left input is append-only | ||
- name: Temporal join type test | ||
sql: | | ||
create table stream(id1 int, a1 int, b1 int); | ||
create table version(id2 int, a2 int, b2 int, primary key (id2)); | ||
select id1, a1, id2, a2 from stream right join version FOR SYSTEM_TIME AS OF NOW() on id1 = id2 where a2 < 10; | ||
stream_error: |- | ||
Not supported: exist dangling temporal scan | ||
HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used. | ||
- name: multi-way temporal join with the same key | ||
sql: | | ||
create table stream(k int, a1 int, b1 int) APPEND ONLY; | ||
create table version1(k int, x1 int, y2 int, primary key (k)); | ||
create table version2(k int, x2 int, y2 int, primary key (k)); | ||
select stream.k, x1, x2, a1, b1 | ||
from stream | ||
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.k = version1.k | ||
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.k = version2.k where a1 < 10; | ||
stream_plan: | | ||
StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version1.k(hidden), version2.k(hidden)], pk_columns: [stream._row_id, version1.k, k, version2.k], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version1.k, version2.k] } | ||
├─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } | ||
| ├─StreamExchange { dist: HashShard(stream.k) } | ||
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) } | ||
| | └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.k) } | ||
| └─StreamTableScan { table: version1, columns: [version1.k, version1.x1], pk: [version1.k], dist: UpstreamHashShard(version1.k) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.k) } | ||
└─StreamTableScan { table: version2, columns: [version2.k, version2.x2], pk: [version2.k], dist: UpstreamHashShard(version2.k) } | ||
- name: multi-way temporal join with different keys | ||
sql: | | ||
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; | ||
create table version1(id1 int, x1 int, y2 int, primary key (id1)); | ||
create table version2(id2 int, x2 int, y2 int, primary key (id2)); | ||
select stream.id1, x1, stream.id2, x2, a1, b1 | ||
from stream | ||
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1 | ||
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10; | ||
stream_plan: | | ||
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] } | ||
├─StreamExchange { dist: HashShard(stream.id2) } | ||
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } | ||
| ├─StreamExchange { dist: HashShard(stream.id1) } | ||
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) } | ||
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) } | ||
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) } | ||
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) } | ||
- name: multi-way temporal join with different keys | ||
sql: | | ||
create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; | ||
create table version1(id1 int, x1 int, y2 int, primary key (id1)); | ||
create table version2(id2 int, x2 int, y2 int, primary key (id2)); | ||
select stream.id1, x1, stream.id2, x2, a1, b1 | ||
from stream | ||
join version1 FOR SYSTEM_TIME AS OF NOW() on stream.id1 = version1.id1 | ||
join version2 FOR SYSTEM_TIME AS OF NOW() on stream.id2 = version2.id2 where a1 < 10; | ||
stream_plan: | | ||
StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version1.id1(hidden), version2.id2(hidden)], pk_columns: [stream._row_id, version1.id1, id1, version2.id2, id2], pk_conflict: "no check" } | ||
└─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version1.id1, version2.id2] } | ||
├─StreamExchange { dist: HashShard(stream.id2) } | ||
| └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } | ||
| ├─StreamExchange { dist: HashShard(stream.id1) } | ||
| | └─StreamFilter { predicate: (stream.a1 < 10:Int32) } | ||
| | └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], pk: [stream._row_id], dist: UpstreamHashShard(stream._row_id) } | ||
| └─StreamNoShuffleExchange { dist: UpstreamHashShard(version1.id1) } | ||
| └─StreamTableScan { table: version1, columns: [version1.id1, version1.x1], pk: [version1.id1], dist: UpstreamHashShard(version1.id1) } | ||
└─StreamNoShuffleExchange { dist: UpstreamHashShard(version2.id2) } | ||
└─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], pk: [version2.id2], dist: UpstreamHashShard(version2.id2) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.