Skip to content

Commit

Permalink
refractor(optimizer): replace StreamIndexScan by StreamTableScan
Browse files Browse the repository at this point in the history
…on logical index scan (risingwavelabs#8567)

Signed-off-by: Clearlove <yifei.c.wei@gmail.com>
  • Loading branch information
y-wei authored Mar 16, 2023
1 parent 5efe089 commit ad7e21b
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 265 deletions.
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/delta_join.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
StreamMaterialize { columns: [a1, a2, b1, b2, i_a1.a._row_id(hidden), i_b1.b._row_id(hidden)], pk_columns: [i_a1.a._row_id, i_b1.b._row_id, a1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(i_a1.a1, i_a1.a._row_id, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: i_a1.a1 = i_b1.b1, output: [i_a1.a1, i_a1.a2, i_b1.b1, i_b1.b2, i_a1.a._row_id, i_b1.b._row_id] }
├─StreamIndexScan { index: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
├─StreamTableScan { table: i_a1, columns: [i_a1.a1, i_a1.a2, i_a1.a._row_id], pk: [i_a1.a._row_id], dist: UpstreamHashShard(i_a1.a1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
Expand All @@ -25,7 +25,7 @@
└─StreamExchange { dist: HashShard(a.a1, i_b1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = i_b1.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamIndexScan { index: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
└─StreamTableScan { table: i_b1, columns: [i_b1.b1, i_b1.b2, i_b1.b._row_id], pk: [i_b1.b._row_id], dist: UpstreamHashShard(i_b1.b1) }
- sql: |
set rw_streaming_enable_delta_join = true;
create table a (a1 int primary key, a2 int);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
Expand All @@ -34,12 +34,12 @@
StreamExchange Hash([2, 4, 3]) from 5
Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode
Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode
Expand Down Expand Up @@ -68,7 +68,7 @@
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
Expand All @@ -86,7 +86,7 @@
BatchPlanNode
Fragment 3
StreamIndexScan { index: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Chain { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
Upstream
BatchPlanNode
Expand Down Expand Up @@ -114,7 +114,7 @@
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), bk1.b._row_id(hidden)], pk_columns: [ak1.a._row_id, bk1.b._row_id, ak1.k1], pk_conflict: "no check" }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1, bk1.b._row_id) }
└─StreamDeltaJoin { type: Inner, predicate: ak1.k1 = bk1.k1, output: [ak1.v, bk1.v, ak1.a._row_id, ak1.k1, bk1.b._row_id] }
├─StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
├─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamTableScan { table: bk1, columns: [bk1.k1, bk1.v, bk1.b._row_id], pk: [bk1.b._row_id], dist: UpstreamHashShard(bk1.k1) }
stream_dist_plan: |
Fragment 0
Expand All @@ -128,7 +128,7 @@
StreamExchange Hash([2, 4, 3]) from 5
Fragment 2
StreamIndexScan { index: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Upstream
BatchPlanNode
Expand Down
7 changes: 0 additions & 7 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,6 @@ impl dyn PlanNode {
if let Some(stream_table_scan) = self.as_stream_table_scan() {
return stream_table_scan.adhoc_to_stream_prost();
}
if let Some(stream_index_scan) = self.as_stream_index_scan() {
return stream_index_scan.adhoc_to_stream_prost();
}
if let Some(stream_share) = self.as_stream_share() {
return stream_share.adhoc_to_stream_prost(state);
}
Expand Down Expand Up @@ -660,7 +657,6 @@ mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_index_scan;
mod stream_local_simple_agg;
mod stream_materialize;
mod stream_now;
Expand Down Expand Up @@ -736,7 +732,6 @@ pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
pub use stream_index_scan::StreamIndexScan;
pub use stream_local_simple_agg::StreamLocalSimpleAgg;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
Expand Down Expand Up @@ -835,7 +830,6 @@ macro_rules! for_all_plan_nodes {
, { Stream, TopN }
, { Stream, HopWindow }
, { Stream, DeltaJoin }
, { Stream, IndexScan }
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
Expand Down Expand Up @@ -936,7 +930,6 @@ macro_rules! for_stream_plan_nodes {
, { Stream, TopN }
, { Stream, HopWindow }
, { Stream, DeltaJoin }
, { Stream, IndexScan }
, { Stream, Expand }
, { Stream, DynamicFilter }
, { Stream, ProjectSet }
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/optimizer/plan_node/stream_delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,13 @@ impl StreamNode for StreamDeltaJoin {
let left = self.left();
let right = self.right();

let left_table = if let Some(stream_index_scan) = left.as_stream_index_scan() {
stream_index_scan.logical()
} else if let Some(stream_table_scan) = left.as_stream_table_scan() {
let left_table = if let Some(stream_table_scan) = left.as_stream_table_scan() {
stream_table_scan.logical()
} else {
unreachable!();
};
let left_table_desc = left_table.table_desc();
let right_table = if let Some(stream_index_scan) = right.as_stream_index_scan() {
stream_index_scan.logical()
} else if let Some(stream_table_scan) = right.as_stream_table_scan() {
let right_table = if let Some(stream_table_scan) = right.as_stream_table_scan() {
stream_table_scan.logical()
} else {
unreachable!();
Expand Down
233 changes: 0 additions & 233 deletions src/frontend/src/optimizer/plan_node/stream_index_scan.rs

This file was deleted.

Loading

0 comments on commit ad7e21b

Please sign in to comment.