Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): use group key as stream key for max-one-row GroupTopN #9082

Merged
merged 6 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@
create table t1 (a int primary key);
select * from t1 order by a limit 1;
stream_plan: |
StreamMaterialize { columns: [a], stream_key: [a], pk_columns: [a], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [a], stream_key: [], pk_columns: [a], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t1.a] }
└─StreamTopN { order: "[t1.a ASC]", limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
Expand Down
36 changes: 16 additions & 20 deletions src/frontend/planner_test/tests/testdata/nexmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@
└─BatchExchange { order: [], dist: HashShard(bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], distribution: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id, bid._row_id], pk_columns: [id, bid._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id] }
└─StreamGroupTopN { order: "[bid.price DESC, bid.date_time ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) }
Expand All @@ -714,7 +714,7 @@
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id, bid._row_id], pk_columns: [id, bid._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamProject { exprs: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id] }
└── StreamGroupTopN { order: "[bid.price DESC, bid.date_time ASC]", limit: 1, offset: 0, group_key: [0] } { state table: 0 }
Expand Down Expand Up @@ -772,10 +772,10 @@

Table 4294967294
├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id ]
├── primary key: [ $0 ASC, $13 ASC ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 2
└── read pk prefix len hint: 1

- id: nexmark_q10
before:
Expand Down Expand Up @@ -1116,24 +1116,20 @@
└─BatchExchange { order: [], dist: HashShard(bid.bidder, bid.auction) }
└─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], distribution: UpstreamHashShard(bid._row_id) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard(bid._row_id) }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) }
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
Comment on lines -1119 to +1123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plan become worse... maybe we need to return multiple unique key in the planRef to make the parent PlanNode choose a small one as the state table's pk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, in fact, it reduce one unnecessary exchange

stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamExchange Hash([7]) from 1
└── StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└── StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 1

Fragment 1
StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] }
└── StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 2

Fragment 2
Chain { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) }
├── Upstream
└── BatchPlanNode
Expand All @@ -1147,10 +1143,10 @@

Table 4294967294
├── columns: [ auction, bidder, price, channel, url, date_time, extra, bid._row_id ]
├── primary key: [ $7 ASC ]
├── primary key: [ $1 ASC, $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ]
├── distribution key: [ 7 ]
└── read pk prefix len hint: 1
├── distribution key: [ 1, 0 ]
└── read pk prefix len hint: 2

- id: nexmark_q19
before:
Expand Down
45 changes: 23 additions & 22 deletions src/frontend/planner_test/tests/testdata/nexmark_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@
└─BatchExchange { order: [], dist: HashShard(auction) }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, id], pk_columns: [_row_id, _row_id#1, id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time, _row_id, _row_id] }
└─StreamAppendOnlyGroupTopN { order: "[price DESC, date_time ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) }
Expand All @@ -748,7 +748,7 @@
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, id], pk_columns: [_row_id, _row_id#1, id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time, _row_id, _row_id] }
└── StreamAppendOnlyGroupTopN { order: "[price DESC, date_time ASC]", limit: 1, offset: 0, group_key: [0] } { state table: 0 }
Expand All @@ -772,7 +772,12 @@
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 1 { columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id ], primary key: [ $0 ASC, $10 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 1
├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id ]
├── primary key: [ $0 ASC, $10 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 2 { columns: [ id, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }

Expand All @@ -786,10 +791,10 @@

Table 4294967294
├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id, _row_id#1 ]
├── primary key: [ $13 ASC, $14 ASC, $0 ASC ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 3
└── read pk prefix len hint: 1

- id: nexmark_q10
before:
Expand Down Expand Up @@ -1132,25 +1137,21 @@
└─BatchExchange { order: [], dist: HashShard(bidder, auction) }
└─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) }
stream_plan: |
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard(_row_id) }
└─StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bidder, auction) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└─StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] }
└─StreamExchange { dist: HashShard(bidder, auction) }
└─StreamRowIdGen { row_id_index: 7 }
└─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" }
├── materialized table: 4294967294
└── StreamExchange Hash([7]) from 1
└── StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└── StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 1

Fragment 1
StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] }
└── StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 }
└── StreamExchange Hash([1, 0]) from 2

Fragment 2
StreamRowIdGen { row_id_index: 7 }
└── StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } { source state table: 1 }

Expand All @@ -1165,10 +1166,10 @@

Table 4294967294
├── columns: [ auction, bidder, price, channel, url, date_time, extra, _row_id ]
├── primary key: [ $7 ASC ]
├── primary key: [ $1 ASC, $0 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ]
├── distribution key: [ 7 ]
└── read pk prefix len hint: 1
├── distribution key: [ 1, 0 ]
└── read pk prefix len hint: 2

- id: nexmark_q19
before:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,8 @@
└─LogicalTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] }
└─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] }
stream_plan: |
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: "NoCheck" }
└─StreamExchange { dist: HashShard(t._row_id) }
└─StreamProject { exprs: [t.x, t.y, t._row_id] }
└─StreamGroupTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [x], pk_columns: [x], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [t.x, t.y, t._row_id] }
└─StreamGroupTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/project_set.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
└─BatchProjectSet { select_list: [Unnest($0)] }
└─BatchScan { table: t, columns: [t.x, t._row_id], distribution: UpstreamHashShard(t._row_id) }
stream_plan: |
StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], stream_key: [t._row_id, projected_row_id], pk_columns: [projected_row_id, t._row_id], pk_conflict: "NoCheck" }
StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], stream_key: [], pk_columns: [projected_row_id], pk_conflict: "NoCheck" }
└─StreamProject { exprs: [projected_row_id, Unnest($0), t._row_id] }
└─StreamTopN { order: "[projected_row_id ASC]", limit: 1, offset: 0 }
└─StreamExchange { dist: Single }
Expand Down
10 changes: 5 additions & 5 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
/// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`].
pub fn infer_internal_table_catalog(
&self,
me: &impl stream::StreamPlanRef,
schema: &Schema,
ctx: OptimizerContextRef,
stream_key: &[usize],
vnode_col_idx: Option<usize>,
) -> TableCatalog {
let schema = me.schema();
let pk_indices = me.logical_pk();
let columns_fields = schema.fields().to_vec();
let column_orders = &self.order.column_orders;
let mut internal_table_catalog_builder =
TableCatalogBuilder::new(me.ctx().with_options().internal_table_subset());
TableCatalogBuilder::new(ctx.with_options().internal_table_subset());

columns_fields.iter().for_each(|field| {
internal_table_catalog_builder.add_column(field);
Expand All @@ -72,7 +72,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
}
});

pk_indices.iter().for_each(|idx| {
stream_key.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down
7 changes: 0 additions & 7 deletions src/frontend/src/optimizer/plan_node/logical_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::optimizer::plan_node::{
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::planner::LIMIT_ALL_COUNT;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
use crate::TableCatalog;

/// `LogicalTopN` sorts the input data and fetches up to `limit` rows from `offset`
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -116,12 +115,6 @@ impl LogicalTopN {
&self.core.group_key
}

/// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`].
pub fn infer_internal_table_catalog(&self, vnode_col_idx: Option<usize>) -> TableCatalog {
self.core
.infer_internal_table_catalog(&self.base, vnode_col_idx)
}

Comment on lines -116 to -121
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed because LogicalTopN shouldn't infer table catalog for stream, and this method is not used anywhere.

fn gen_dist_stream_top_n_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
let input_dist = stream_input.distribution().clone();

Expand Down
20 changes: 16 additions & 4 deletions src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,15 @@ pub fn to_stream_prost_body(
})
}
Node::GroupTopN(me) => {
let input = &me.core.input.0;
let table = me
.core
.infer_internal_table_catalog(base, me.vnode_col_idx)
.infer_internal_table_catalog(
input.schema(),
input.ctx(),
input.logical_pk(),
me.vnode_col_idx,
)
.with_id(state.gen_table_id_wrapped());
let group_topn_node = GroupTopNNode {
limit: me.core.limit,
Expand Down Expand Up @@ -723,15 +729,21 @@ pub fn to_stream_prost_body(
PbNodeBody::Source(SourceNode { source_inner })
}
Node::TopN(me) => {
let input = &me.core.input.0;
let me = &me.core;
let topn_node = TopNNode {
limit: me.limit,
offset: me.offset,
with_ties: me.with_ties,
table: Some(
me.infer_internal_table_catalog(base, None)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
me.infer_internal_table_catalog(
input.schema(),
input.ctx(),
input.logical_pk(),
None,
)
.with_id(state.gen_table_id_wrapped())
.to_internal_table_prost(),
),
order_by: me.order.to_protobuf(),
};
Expand Down
Loading