diff --git a/src/frontend/planner_test/tests/testdata/index_selection.yaml b/src/frontend/planner_test/tests/testdata/index_selection.yaml index f79e229c88796..14884c04b1cf2 100644 --- a/src/frontend/planner_test/tests/testdata/index_selection.yaml +++ b/src/frontend/planner_test/tests/testdata/index_selection.yaml @@ -613,7 +613,7 @@ create table t1 (a int primary key); select * from t1 order by a limit 1; stream_plan: | - StreamMaterialize { columns: [a], stream_key: [a], pk_columns: [a], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [a], stream_key: [], pk_columns: [a], pk_conflict: "NoCheck" } └─StreamProject { exprs: [t1.a] } └─StreamTopN { order: "[t1.a ASC]", limit: 1, offset: 0 } └─StreamExchange { dist: Single } diff --git a/src/frontend/planner_test/tests/testdata/nexmark.yaml b/src/frontend/planner_test/tests/testdata/nexmark.yaml index 607fab104325b..936649f7c92c2 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark.yaml @@ -703,7 +703,7 @@ └─BatchExchange { order: [], dist: HashShard(bid.auction) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], distribution: UpstreamHashShard(bid._row_id) } stream_plan: | - StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id, bid._row_id], pk_columns: [id, bid._row_id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id] } └─StreamGroupTopN { order: "[bid.price DESC, bid.date_time ASC]", limit: 1, offset: 0, group_key: [0] } └─StreamFilter { predicate: (bid.date_time >= auction.date_time) AND (bid.date_time <= auction.expires) } @@ -714,7 +714,7 @@ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id, bid._row_id], pk_columns: [id, bid._row_id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" } ├── materialized table: 4294967294 └── StreamProject { exprs: [auction.id, auction.item_name, auction.description, auction.initial_bid, auction.reserve, auction.date_time, auction.expires, auction.seller, auction.category, bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id] } └── StreamGroupTopN { order: "[bid.price DESC, bid.date_time ASC]", limit: 1, offset: 0, group_key: [0] } { state table: 0 } @@ -772,10 +772,10 @@ Table 4294967294 ├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, bid._row_id ] - ├── primary key: [ $0 ASC, $13 ASC ] + ├── primary key: [ $0 ASC ] ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 ] ├── distribution key: [ 0 ] - └── read pk prefix len hint: 2 + └── read pk prefix len hint: 1 - id: nexmark_q10 before: @@ -1116,24 +1116,20 @@ └─BatchExchange { order: [], dist: HashShard(bid.bidder, bid.auction) } └─BatchScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], distribution: UpstreamHashShard(bid._row_id) } stream_plan: | - StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } - └─StreamExchange { dist: HashShard(bid._row_id) } - └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } - └─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } - └─StreamExchange { dist: HashShard(bid.bidder, bid.auction) } - └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } + StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" } + └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } + └─StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } + └─StreamExchange { dist: HashShard(bid.bidder, bid.auction) } + └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bid._row_id], pk_columns: [bid._row_id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, bid._row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" } ├── materialized table: 4294967294 - └── StreamExchange Hash([7]) from 1 + └── StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } + └── StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 } + └── StreamExchange Hash([1, 0]) from 1 Fragment 1 - StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id] } - └── StreamAppendOnlyGroupTopN { order: "[bid.date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 } - └── StreamExchange Hash([1, 0]) from 2 - - Fragment 2 Chain { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.channel, bid.url, bid.date_time, bid.extra, bid._row_id], pk: [bid._row_id], dist: UpstreamHashShard(bid._row_id) } ├── Upstream └── BatchPlanNode @@ -1147,10 +1143,10 @@ Table 4294967294 ├── columns: [ auction, bidder, price, channel, url, date_time, extra, bid._row_id ] - ├── primary key: [ $7 ASC ] + ├── primary key: [ $1 ASC, $0 ASC ] ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ] - ├── distribution key: [ 7 ] - └── read pk prefix len hint: 1 + ├── distribution key: [ 1, 0 ] + └── read pk prefix len hint: 2 - id: nexmark_q18_rank before: diff --git a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml index 58f076a3e6d82..121ace0eaaf2f 100644 --- a/src/frontend/planner_test/tests/testdata/nexmark_source.yaml +++ b/src/frontend/planner_test/tests/testdata/nexmark_source.yaml @@ -735,7 +735,7 @@ └─BatchExchange { order: [], dist: HashShard(auction) } └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } stream_plan: | - StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, id], pk_columns: [_row_id, _row_id#1, id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time, _row_id, _row_id] } └─StreamAppendOnlyGroupTopN { order: "[price DESC, date_time ASC]", limit: 1, offset: 0, group_key: [0] } └─StreamFilter { predicate: (date_time >= date_time) AND (date_time <= expires) } @@ -748,7 +748,7 @@ └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [_row_id, _row_id#1, id], pk_columns: [_row_id, _row_id#1, id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id(hidden), _row_id#1(hidden)], stream_key: [id], pk_columns: [id], pk_conflict: "NoCheck" } ├── materialized table: 4294967294 └── StreamProject { exprs: [id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, date_time, _row_id, _row_id] } └── StreamAppendOnlyGroupTopN { order: "[price DESC, date_time ASC]", limit: 1, offset: 0, group_key: [0] } { state table: 0 } @@ -772,7 +772,12 @@ ├── distribution key: [ 0 ] └── read pk prefix len hint: 1 - Table 1 { columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id ], primary key: [ $0 ASC, $10 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + Table 1 + ├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, extra, _row_id ] + ├── primary key: [ $0 ASC, $10 ASC ] + ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] + ├── distribution key: [ 0 ] + └── read pk prefix len hint: 1 Table 2 { columns: [ id, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } @@ -786,10 +791,10 @@ Table 4294967294 ├── columns: [ id, item_name, description, initial_bid, reserve, date_time, expires, seller, category, auction, bidder, price, bid_date_time, _row_id, _row_id#1 ] - ├── primary key: [ $13 ASC, $14 ASC, $0 ASC ] + ├── primary key: [ $0 ASC ] ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 ] ├── distribution key: [ 0 ] - └── read pk prefix len hint: 3 + └── read pk prefix len hint: 1 - id: nexmark_q10 before: @@ -1132,25 +1137,21 @@ └─BatchExchange { order: [], dist: HashShard(bidder, auction) } └─BatchSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"], filter: (None, None) } stream_plan: | - StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" } - └─StreamExchange { dist: HashShard(_row_id) } - └─StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] } - └─StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } - └─StreamExchange { dist: HashShard(bidder, auction) } - └─StreamRowIdGen { row_id_index: 7 } - └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } + StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" } + └─StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] } + └─StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } + └─StreamExchange { dist: HashShard(bidder, auction) } + └─StreamRowIdGen { row_id_index: 7 } + └─StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [auction, bidder, price, channel, url, date_time, extra, _row_id(hidden)], stream_key: [bidder, auction], pk_columns: [bidder, auction], pk_conflict: "NoCheck" } ├── materialized table: 4294967294 - └── StreamExchange Hash([7]) from 1 + └── StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] } + └── StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 } + └── StreamExchange Hash([1, 0]) from 1 Fragment 1 - StreamProject { exprs: [auction, bidder, price, channel, url, date_time, extra, _row_id] } - └── StreamAppendOnlyGroupTopN { order: "[date_time DESC]", limit: 1, offset: 0, group_key: [1, 0] } { state table: 0 } - └── StreamExchange Hash([1, 0]) from 2 - - Fragment 2 StreamRowIdGen { row_id_index: 7 } └── StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } { source state table: 1 } @@ -1165,10 +1166,10 @@ Table 4294967294 ├── columns: [ auction, bidder, price, channel, url, date_time, extra, _row_id ] - ├── primary key: [ $7 ASC ] + ├── primary key: [ $1 ASC, $0 ASC ] ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ] - ├── distribution key: [ 7 ] - └── read pk prefix len hint: 1 + ├── distribution key: [ 1, 0 ] + └── read pk prefix len hint: 2 - id: nexmark_q18_rank before: diff --git a/src/frontend/planner_test/tests/testdata/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/over_window_function.yaml index 48638a90a01e6..f1566a093773b 100644 --- a/src/frontend/planner_test/tests/testdata/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/over_window_function.yaml @@ -331,9 +331,8 @@ └─LogicalTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] } └─LogicalScan { table: t, columns: [t.x, t.y, t._row_id] } stream_plan: | - StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: "NoCheck" } - └─StreamExchange { dist: HashShard(t._row_id) } - └─StreamProject { exprs: [t.x, t.y, t._row_id] } - └─StreamGroupTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] } - └─StreamExchange { dist: HashShard(t.x) } - └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + StreamMaterialize { columns: [x, y, t._row_id(hidden)], stream_key: [x], pk_columns: [x], pk_conflict: "NoCheck" } + └─StreamProject { exprs: [t.x, t.y, t._row_id] } + └─StreamGroupTopN { order: "[t.y ASC]", limit: 1, offset: 0, group_key: [0] } + └─StreamExchange { dist: HashShard(t.x) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } diff --git a/src/frontend/planner_test/tests/testdata/project_set.yaml b/src/frontend/planner_test/tests/testdata/project_set.yaml index a900e298c7109..22f0add339084 100644 --- a/src/frontend/planner_test/tests/testdata/project_set.yaml +++ b/src/frontend/planner_test/tests/testdata/project_set.yaml @@ -94,7 +94,7 @@ └─BatchProjectSet { select_list: [Unnest($0)] } └─BatchScan { table: t, columns: [t.x, t._row_id], distribution: UpstreamHashShard(t._row_id) } stream_plan: | - StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], stream_key: [t._row_id, projected_row_id], pk_columns: [projected_row_id, t._row_id], pk_conflict: "NoCheck" } + StreamMaterialize { columns: [projected_row_id(hidden), unnest, t._row_id(hidden)], stream_key: [], pk_columns: [projected_row_id], pk_conflict: "NoCheck" } └─StreamProject { exprs: [projected_row_id, Unnest($0), t._row_id] } └─StreamTopN { order: "[projected_row_id ASC]", limit: 1, offset: 0 } └─StreamExchange { dist: Single } diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index 395cdf4c69da9..56fd55f786f24 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -37,15 +37,15 @@ impl TopN { /// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`]. pub fn infer_internal_table_catalog( &self, - me: &impl stream::StreamPlanRef, + schema: &Schema, + ctx: OptimizerContextRef, + stream_key: &[usize], vnode_col_idx: Option, ) -> TableCatalog { - let schema = me.schema(); - let pk_indices = me.logical_pk(); let columns_fields = schema.fields().to_vec(); let column_orders = &self.order.column_orders; let mut internal_table_catalog_builder = - TableCatalogBuilder::new(me.ctx().with_options().internal_table_subset()); + TableCatalogBuilder::new(ctx.with_options().internal_table_subset()); columns_fields.iter().for_each(|field| { internal_table_catalog_builder.add_column(field); @@ -71,7 +71,7 @@ impl TopN { } }); - pk_indices.iter().for_each(|idx| { + stream_key.iter().for_each(|idx| { if !order_cols.contains(idx) { internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index 239ea770aa1b8..3164d8cc64966 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -33,7 +33,6 @@ use crate::optimizer::plan_node::{ use crate::optimizer::property::{Distribution, Order, RequiredDist}; use crate::planner::LIMIT_ALL_COUNT; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition}; -use crate::TableCatalog; /// `LogicalTopN` sorts the input data and fetches up to `limit` rows from `offset` #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -113,12 +112,6 @@ impl LogicalTopN { &self.core.group_key } - /// Infers the state table catalog for [`StreamTopN`] and [`StreamGroupTopN`]. - pub fn infer_internal_table_catalog(&self, vnode_col_idx: Option) -> TableCatalog { - self.core - .infer_internal_table_catalog(&self.base, vnode_col_idx) - } - fn gen_dist_stream_top_n_plan(&self, stream_input: PlanRef) -> Result { let input_dist = stream_input.distribution().clone(); diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 04f8736304a59..22e3909d1205d 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -583,9 +583,15 @@ pub fn to_stream_prost_body( }) } Node::GroupTopN(me) => { + let input = &me.core.input.0; let table = me .core - .infer_internal_table_catalog(base, me.vnode_col_idx) + .infer_internal_table_catalog( + input.schema(), + input.ctx(), + input.logical_pk(), + me.vnode_col_idx, + ) .with_id(state.gen_table_id_wrapped()); let group_topn_node = GroupTopNNode { limit: me.core.limit_attr.limit(), @@ -723,15 +729,21 @@ pub fn to_stream_prost_body( PbNodeBody::Source(SourceNode { source_inner }) } Node::TopN(me) => { + let input = &me.core.input.0; let me = &me.core; let topn_node = TopNNode { limit: me.limit_attr.limit(), offset: me.offset, with_ties: me.limit_attr.with_ties(), table: Some( - me.infer_internal_table_catalog(base, None) - .with_id(state.gen_table_id_wrapped()) - .to_internal_table_prost(), + me.infer_internal_table_catalog( + input.schema(), + input.ctx(), + input.logical_pk(), + None, + ) + .with_id(state.gen_table_id_wrapped()) + .to_internal_table_prost(), ), order_by: me.order.to_protobuf(), }; diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 62986d438e1dc..464a5d3281557 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -53,10 +53,18 @@ impl StreamGroupTopN { watermark_columns }; + // We can use the group key as the stream key when there is at most one record for each + // value of the group key. + let logical_pk = if logical.limit_attr.max_one_row() { + logical.group_key.clone() + } else { + input.logical_pk().to_vec() + }; + let base = PlanBase::new_stream( input.ctx(), schema, - input.logical_pk().to_vec(), + logical_pk, input.functional_dependency().clone(), input.distribution().clone(), false, @@ -89,9 +97,16 @@ impl StreamGroupTopN { impl StreamNode for StreamGroupTopN { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; + + let input = self.input(); let table = self .logical - .infer_internal_table_catalog(&self.base, self.vnode_col_idx) + .infer_internal_table_catalog( + input.schema(), + input.ctx(), + input.logical_pk(), + self.vnode_col_idx, + ) .with_id(state.gen_table_id_wrapped()); assert!(!self.group_key().is_empty()); let group_topn_node = GroupTopNNode { diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 03458fbdaf537..0e41d429714ae 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -95,13 +95,20 @@ impl_plan_tree_node_for_unary! { StreamTopN } impl StreamNode for StreamTopN { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> PbNodeBody { use risingwave_pb::stream_plan::*; + + let input = self.input(); let topn_node = TopNNode { limit: self.limit_attr().limit(), offset: self.offset(), with_ties: self.limit_attr().with_ties(), table: Some( self.logical - .infer_internal_table_catalog(&self.base, None) + .infer_internal_table_catalog( + input.schema(), + input.ctx(), + input.logical_pk(), + None, + ) .with_id(state.gen_table_id_wrapped()) .to_internal_table_prost(), ), diff --git a/src/stream/src/from_proto/group_top_n.rs b/src/stream/src/from_proto/group_top_n.rs index 9ca78c9a0ba7e..64f5f930e98e1 100644 --- a/src/stream/src/from_proto/group_top_n.rs +++ b/src/stream/src/from_proto/group_top_n.rs @@ -60,7 +60,17 @@ impl ExecutorBuilder for GroupTopNExecutorBuilder { .map(ColumnOrder::from_protobuf) .collect(); - assert_eq!(¶ms.pk_indices, input.pk_indices()); + if node.limit == 1 && !node.with_ties { + // When there is at most one record for each value of the group key, `params.pk_indices` + // is the group key instead of the input's stream key. + assert_eq!( + ¶ms.pk_indices, + &node.group_key.iter().map(|idx| *idx as usize).collect_vec() + ); + } else { + assert_eq!(¶ms.pk_indices, input.pk_indices()); + } + let args = GroupTopNExecutorDispatcherArgs { input, ctx: params.actor_context, diff --git a/src/stream/src/from_proto/group_top_n_appendonly.rs b/src/stream/src/from_proto/group_top_n_appendonly.rs index 4312a4484ba0c..b3876b973dc79 100644 --- a/src/stream/src/from_proto/group_top_n_appendonly.rs +++ b/src/stream/src/from_proto/group_top_n_appendonly.rs @@ -74,7 +74,17 @@ impl ExecutorBuilder for AppendOnlyGroupTopNExecutorBuilder { .map(ColumnOrder::from_protobuf) .collect(); - assert_eq!(¶ms.pk_indices, input.pk_indices()); + if node.limit == 1 && !node.with_ties { + // When there is at most one record for each value of the group key, `params.pk_indices` + // is the group key instead of the input's stream key. + assert_eq!( + ¶ms.pk_indices, + &node.group_key.iter().map(|idx| *idx as usize).collect_vec() + ); + } else { + assert_eq!(¶ms.pk_indices, input.pk_indices()); + } + let args = AppendOnlyGroupTopNExecutorDispatcherArgs { input, ctx: params.actor_context,