-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(optimizer): use group key as stream key for max-one-row GroupTopN
#9082
Conversation
/// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`]. | ||
pub fn infer_internal_table_catalog(&self, vnode_col_idx: Option<usize>) -> TableCatalog { | ||
self.core | ||
.infer_internal_table_catalog(&self.base, vnode_col_idx) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed because LogicalTopN
shouldn't infer table catalog for stream, and this method is not used anywhere.
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } | ||
└─StreamExchange { dist: HashShard(bid._row_id) } | ||
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } | ||
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } | ||
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) } | ||
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } | ||
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" } | ||
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } | ||
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } | ||
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) } | ||
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this plan become worse... maybe we need to return multiple unique key in the planRef to make the parent PlanNode choose a small one as the state table's pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, in fact, it reduce one unnecessary exchange
GroupTopN
when limit is 1GroupTopN
when limit is 1 without WITH TIES
Codecov Report
@@ Coverage Diff @@
## main #9082 +/- ##
==========================================
- Coverage 70.88% 70.87% -0.02%
==========================================
Files 1197 1197
Lines 199054 199089 +35
==========================================
+ Hits 141109 141113 +4
- Misses 57945 57976 +31
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 5 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
GroupTopN
when limit is 1 without WITH TIES
GroupTopN
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, this PR makes some little regression but in most of case it is optimization
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } | ||
└─StreamExchange { dist: HashShard(bid._row_id) } | ||
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } | ||
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } | ||
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) } | ||
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } | ||
StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" } | ||
└─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } | ||
└─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } | ||
└─StreamExchange { dist: HashShard(bid.bidder, bid.auction) } | ||
└─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, in fact, it reduce one unnecessary exchange
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
We can use the group key as the stream key for
GroupTopN
whenLIMIT
is 1 withoutWITH TIES
because there will be at most one record for each value of the group key.This is an optimization for the optimizer. At the same time, #9016 depends on this PR for a correct plan.
Checklist For Contributors
./risedev check
(or alias,./risedev c
)Checklist For Reviewers
Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note