diff --git a/src/frontend/src/stream_fragmenter/mod.rs b/src/frontend/src/stream_fragmenter/mod.rs index 3c82b8edebc00..301a47f64223d 100644 --- a/src/frontend/src/stream_fragmenter/mod.rs +++ b/src/frontend/src/stream_fragmenter/mod.rs @@ -346,6 +346,10 @@ impl StreamFragmenter { top_n_node.table_id = state.gen_table_id(); } + NodeBody::AppendOnlyTopN(append_only_top_n_node) => { + append_only_top_n_node.table_id = state.gen_table_id(); + } + _ => {} } } @@ -461,7 +465,7 @@ mod tests { } { - // test HashJoin Type + // test TopN Type let mut stream_node = StreamNode { node_body: Some(NodeBody::TopN(TopNNode { ..Default::default() @@ -475,5 +479,23 @@ mod tests { assert_eq!(expect_table_id, top_n_node.table_id); } } + + { + // test AppendOnlyTopN Type + let mut stream_node = StreamNode { + node_body: Some(NodeBody::AppendOnlyTopN(TopNNode { + ..Default::default() + })), + ..Default::default() + }; + StreamFragmenter::assign_local_table_id_to_stream_node(&mut state, &mut stream_node); + + if let NodeBody::AppendOnlyTopN(append_only_top_n_node) = + stream_node.node_body.as_ref().unwrap() + { + expect_table_id += 1; + assert_eq!(expect_table_id, append_only_top_n_node.table_id); + } + } } } diff --git a/src/meta/src/stream/stream_graph.rs b/src/meta/src/stream/stream_graph.rs index 28f2c04b65216..7bcf141041fb2 100644 --- a/src/meta/src/stream/stream_graph.rs +++ b/src/meta/src/stream/stream_graph.rs @@ -541,41 +541,40 @@ impl StreamGraphBuilder { let mut new_stream_node = stream_node.clone(); // Table id rewrite done below. + match new_stream_node.node_body.as_mut().unwrap() { + NodeBody::HashJoin(node) => { + // The operator id must be assigned with table ids. Otherwise it is a logic + // error. + let left_table_id = node.left_table_id + table_id_offset; + let right_table_id = left_table_id + 1; + node.left_table_id = left_table_id; + node.right_table_id = right_table_id; + } - if let NodeBody::HashJoin(node) = new_stream_node.node_body.as_mut().unwrap() { - // The operator id must be assigned with table ids. Otherwise it is a logic - // error. - let left_table_id = node.left_table_id + table_id_offset; - let right_table_id = left_table_id + 1; - node.left_table_id = left_table_id; - node.right_table_id = right_table_id; - } - - if let NodeBody::Lookup(node) = new_stream_node.node_body.as_mut().unwrap() { - if let Some(ArrangementTableId::TableId(table_id)) = - &mut node.arrangement_table_id - { - *table_id += table_id_offset; + NodeBody::Lookup(node) => { + if let Some(ArrangementTableId::TableId(table_id)) = + &mut node.arrangement_table_id + { + *table_id += table_id_offset; + } } - } - if let NodeBody::Arrange(node) = new_stream_node.node_body.as_mut().unwrap() { - node.table_id += table_id_offset; - } + NodeBody::Arrange(node) => { + node.table_id += table_id_offset; + } - if let NodeBody::HashAgg(node) = new_stream_node.node_body.as_mut().unwrap() { - assert_eq!(node.table_ids.len(), node.agg_calls.len()); - // In-place update the table id. Convert from local to global. - for table_id in &mut node.table_ids { - *table_id += table_id_offset; + NodeBody::HashAgg(node) => { + assert_eq!(node.table_ids.len(), node.agg_calls.len()); + // In-place update the table id. Convert from local to global. + for table_id in &mut node.table_ids { + *table_id += table_id_offset; + } } - } - if let NodeBody::TopN(node) = new_stream_node.node_body.as_mut().unwrap() { - node.table_id += table_id_offset; - } + NodeBody::TopN(node) | NodeBody::AppendOnlyTopN(node) => { + node.table_id += table_id_offset; + } - match new_stream_node.node_body.as_mut().unwrap() { NodeBody::GlobalSimpleAgg(node) | NodeBody::LocalSimpleAgg(node) => { assert_eq!(node.table_ids.len(), node.agg_calls.len()); // In-place update the table id. Convert from local to global. diff --git a/src/stream/src/from_proto/top_n_appendonly.rs b/src/stream/src/from_proto/top_n_appendonly.rs index 71b4d65cf17e8..b03b72ee84e4b 100644 --- a/src/stream/src/from_proto/top_n_appendonly.rs +++ b/src/stream/src/from_proto/top_n_appendonly.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::catalog::TableId; use risingwave_common::util::sort_util::OrderPair; use super::*; @@ -39,7 +40,8 @@ impl ExecutorBuilder for AppendOnlyTopNExecutorBuilder { }; let cache_size = Some(1024); let total_count = (0, 0); - let keyspace = Keyspace::executor_root(store, params.executor_id); + let table_id = TableId::new(node.table_id); + let keyspace = Keyspace::table_root(store, &table_id); let key_indices = node .get_distribution_keys() .iter()