Skip to content

Commit

Permalink
feat(optimizer): use group key as stream key for max-one-row `GroupTo…
Browse files Browse the repository at this point in the history
…pN` (#9082)
  • Loading branch information
xx01cyx authored Apr 12, 2023
1 parent 8ab2107 commit 83a66ac
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@
create table t1 (a int primary key);
select * from t1 order by a limit 1;
stream_plan: |
StreamMaterialize { columns: [a], stream_key: [a], pk_columns: [a], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [a], stream_key: [], pk_columns: [a], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t1.a] }
└─StreamTopN { order: "[t1.a ASC]", limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
Expand Down
36 changes: 16 additions & 20 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@
└─BatchExchange { order: [], dist: HashShard(bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], distribution: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id, bid._row_id], pk_columns: [id, bid._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id] }
└─StreamGroupTopN { order: "[bid.price DESC, bid.date_time ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) }
Expand All @@ -714,7 +714,7 @@
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id, bid._row_id], pk_columns: [id, bid._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamProject { exprs: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id] }
└── StreamGroupTopN { order: "[bid.price DESC, bid.date_time ASC]", limit: 1, offset: 0, group_key: [0] } { state table: 0 }
Expand Down Expand Up @@ -772,10 +772,10 @@
Table 4294967294
├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id ]
├── primary key: [ $0 ASC, $13 ASC ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 2
└── read pk prefix len hint: 1
- id: nexmark_q10
before:
Expand Down Expand Up @@ -1116,24 +1116,20 @@
└─BatchExchange { order: [], dist: HashShard(bid.bidder, bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], distribution: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard(bid._row_id) }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamExchange Hash([7]) from 1
└── StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└── StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 1
Fragment 1
StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└── StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 2
Fragment 2
Chain { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
├── Upstream
└── BatchPlanNode
Expand All @@ -1147,10 +1143,10 @@
Table 4294967294
├── columns: [ auction, bidder, price, channel, url, date_time, extra, bid._row_id ]
├── primary key: [ $7 ASC ]
├── primary key: [ $1 ASC, $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ]
├── distribution key: [ 7 ]
└── read pk prefix len hint: 1
├── distribution key: [ 1, 0 ]
└── read pk prefix len hint: 2
- id: nexmark_q18_rank
before:
Expand Down
45 changes: 23 additions & 22 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, id], pk_columns: [_row_id, _row_id#1, id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time, _row_id, _row_id] }
└─StreamAppendOnlyGroupTopN { order: "[price DESC, date_time ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) }
Expand All @@ -748,7 +748,7 @@
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, id], pk_columns: [_row_id, _row_id#1, id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time, _row_id, _row_id] }
└── StreamAppendOnlyGroupTopN { order: "[price DESC, date_time ASC]", limit: 1, offset: 0, group_key: [0] } { state table: 0 }
Expand All @@ -772,7 +772,12 @@
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 1 { columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id ], primary key: [ $0 ASC, $10 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 1
├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id ]
├── primary key: [ $0 ASC, $10 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 2 { columns: [ id, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Expand All @@ -786,10 +791,10 @@
Table 4294967294
├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id, _row_id#1 ]
├── primary key: [ $13 ASC, $14 ASC, $0 ASC ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 3
└── read pk prefix len hint: 1
- id: nexmark_q10
before:
Expand Down Expand Up @@ -1132,25 +1137,21 @@
└─BatchExchange { order: [], dist: HashShard(bidder, auction) }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bidder, auction) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bidder, auction) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamExchange Hash([7]) from 1
└── StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└── StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 1
Fragment 1
StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└── StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 2
Fragment 2
StreamRowIdGen { row_id_index: 7 }
└── StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } { source state table: 1 }
Expand All @@ -1165,10 +1166,10 @@
Table 4294967294
├── columns: [ auction, bidder, price, channel, url, date_time, extra, _row_id ]
├── primary key: [ $7 ASC ]
├── primary key: [ $1 ASC, $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ]
├── distribution key: [ 7 ]
└── read pk prefix len hint: 1
├── distribution key: [ 1, 0 ]
└── read pk prefix len hint: 2
- id: nexmark_q18_rank
before:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,8 @@
└─LogicalTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] }
└─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] }
stream_plan: |
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard(t._row_id) }
└─StreamProject { exprs: [t.x, t.y, t._row_id] }
└─StreamGroupTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [x], pk_columns: [x], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t.x, t.y, t._row_id] }
└─StreamGroupTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/project_set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
└─BatchProjectSet { select_list: [Unnest($0)] }
└─BatchScan { table: t, columns: [t.x, t._row_id], distribution: UpstreamHashShard(t._row_id) }
stream_plan: |
StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], stream_key: [t._row_id, projected_row_id], pk_columns: [projected_row_id, t._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], stream_key: [], pk_columns: [projected_row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [projected_row_id, Unnest($0), t._row_id] }
└─StreamTopN { order: "[projected_row_id ASC]", limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
/// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`].
pub fn infer_internal_table_catalog(
&self,
me: &impl stream::StreamPlanRef,
schema: &Schema,
ctx: OptimizerContextRef,
stream_key: &[usize],
vnode_col_idx: Option<usize>,
) -> TableCatalog {
let schema = me.schema();
let pk_indices = me.logical_pk();
let columns_fields = schema.fields().to_vec();
let column_orders = &self.order.column_orders;
let mut internal_table_catalog_builder =
TableCatalogBuilder::new(me.ctx().with_options().internal_table_subset());
TableCatalogBuilder::new(ctx.with_options().internal_table_subset());

columns_fields.iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
Expand All @@ -71,7 +71,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
}
});

pk_indices.iter().for_each(|idx| {
stream_key.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down
7 changes: 0 additions & 7 deletions src/frontend/src/optimizer/plan_node/logical_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::optimizer::plan_node::{
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::planner::LIMIT_ALL_COUNT;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
use crate::TableCatalog;

/// `LogicalTopN` sorts the input data and fetches up to `limit` rows from `offset`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -113,12 +112,6 @@ impl LogicalTopN {
&self.core.group_key
}

/// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`].
pub fn infer_internal_table_catalog(&self, vnode_col_idx: Option<usize>) -> TableCatalog {
self.core
.infer_internal_table_catalog(&self.base, vnode_col_idx)
}

fn gen_dist_stream_top_n_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
let input_dist = stream_input.distribution().clone();

Expand Down
Loading

0 comments on commit 83a66ac

Please sign in to comment.