From 1acc8f540d8b95b7c69d142636aa2f6263bcde8b Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 20 Mar 2024 18:07:06 +0800 Subject: [PATCH 01/11] chore: improve group --- .../transforms/aggregator/aggregate_meta.rs | 14 ++++++++++++-- .../aggregator/transform_aggregate_final.rs | 15 ++------------- .../aggregator/transform_group_by_final.rs | 15 ++------------- 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index a18ef93232d9..d4d46fab2bfe 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -53,13 +53,13 @@ impl SerializedPayload { entry.value.as_column().unwrap() } - pub fn convert_to_partitioned_payload( + pub fn convert_to_aggregate_table( &self, group_types: Vec, aggrs: Vec>, radix_bits: u64, arena: Arc, - ) -> Result { + ) -> Result { let rows_num = self.data_block.num_rows(); let config = HashTableConfig::default().with_initial_radix_bits(radix_bits); let mut state = ProbeState::default(); @@ -93,7 +93,17 @@ impl SerializedPayload { hashtable.add_groups(&mut state, &group_columns, &[vec![]], &agg_states, rows_num)?; hashtable.payload.mark_min_cardinality(); + Ok(hashtable) + } + pub fn convert_to_partitioned_payload( + &self, + group_types: Vec, + aggrs: Vec>, + radix_bits: u64, + arena: Arc, + ) -> Result { + let hashtable = self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena)?; Ok(hashtable.payload) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index f20aa83d6083..edc2f27e90d4 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -105,23 +105,12 @@ impl TransformFinalAggregate { None => { debug_assert!(bucket == payload.bucket); let arena = Arc::new(Bump::new()); - let payload = payload.convert_to_partitioned_payload( + agg_hashtable = Some(payload.convert_to_aggregate_table( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), 0, arena, - )?; - let capacity = - AggregateHashTable::get_capacity_for_count(payload.len()); - let mut hashtable = AggregateHashTable::new_with_capacity( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - HashTableConfig::default().with_initial_radix_bits(0), - capacity, - Arc::new(Bump::new()), - ); - hashtable.combine_payloads(&payload, &mut self.flush_state)?; - agg_hashtable = Some(hashtable); + )?); } }, _ => unreachable!(), diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 5ae69b6f85ee..c60478c2cf3e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -98,23 +98,12 @@ impl TransformFinalGroupBy { None => { debug_assert!(bucket == payload.bucket); let arena = Arc::new(Bump::new()); - let payload = payload.convert_to_partitioned_payload( + agg_hashtable = Some(payload.convert_to_aggregate_table( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), 0, arena, - )?; - let capacity = - AggregateHashTable::get_capacity_for_count(payload.len()); - let mut hashtable = AggregateHashTable::new_with_capacity( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - HashTableConfig::default().with_initial_radix_bits(0), - capacity, - Arc::new(Bump::new()), - ); - hashtable.combine_payloads(&payload, &mut self.flush_state)?; - agg_hashtable = Some(hashtable); + )?); } }, _ => unreachable!(), From 14cc0a9795ee9edf938954257ed55f3d21bbf5eb Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 20 Mar 2024 19:58:34 +0800 Subject: [PATCH 02/11] reduce serializer overhead --- .../expression/src/aggregate/payload_flush.rs | 22 +++++++++----- .../serde/transform_aggregate_serializer.rs | 26 +++++++++++++---- .../serde/transform_group_by_serializer.rs | 29 ++++++++++++++----- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 8a7c4099dca9..97c156a1cd15 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -117,6 +117,19 @@ impl Payload { let mut blocks = vec![]; while self.flush(&mut state) { + if let Some(block) = self.aggregate_flush(&mut state)? { + blocks.push(block); + } + } + + if blocks.is_empty() { + return Ok(self.empty_block()); + } + DataBlock::concat(&blocks) + } + + pub fn aggregate_flush(&self, state: &mut PayloadFlushState) -> Result> { + if self.flush(state) { let row_count = state.row_count; let mut state_builders: Vec = self @@ -146,15 +159,10 @@ impl Payload { } cols.extend_from_slice(&state.take_group_columns()); - - blocks.push(DataBlock::new_from_columns(cols)); - } - - if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(Some(DataBlock::new_from_columns(cols))); } - DataBlock::concat(&blocks) + Ok(None) } pub fn group_by_flush_all(&self) -> Result { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index eb9a50b092cf..731873266fef 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -22,6 +22,7 @@ use databend_common_expression::types::binary::BinaryColumnBuilder; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::PayloadFlushState; use databend_common_functions::aggregates::StateAddr; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; @@ -209,6 +210,7 @@ pub struct SerializeAggregateStream { pub payload: Pin>>, // old hashtable' iter iter: Option< as HashtableLike>::Iterator<'static>>, + flush_state: Option, end_iter: bool, } @@ -231,10 +233,18 @@ impl SerializeAggregateStream { None }; + let flush_state = + if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() { + Some(PayloadFlushState::default()) + } else { + None + }; + SerializeAggregateStream:: { iter, payload, end_iter: false, + flush_state, method: method.clone(), params: params.clone(), } @@ -299,13 +309,19 @@ impl SerializeAggregateStream { self.finish(state_builders, group_key_builder) } SerializePayload::AggregatePayload(p) => { - let data_block = p.payload.aggregate_flush_all()?; + let state = self.flush_state.as_mut().unwrap(); + let block = p.payload.aggregate_flush(state)?; - self.end_iter = true; + if state.flush_page >= p.payload.pages.len() { + self.end_iter = true; + } - Ok(Some(data_block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))?)) + match block { + Some(block) => Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), + ))?)), + None => Ok(None), + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index 7d952d7cf1d8..bcfdc34722f7 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::PayloadFlushState; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; use databend_common_pipeline_core::processors::Event; @@ -178,6 +179,7 @@ pub struct SerializeGroupByStream { pub payload: Pin>>, // old hashtable' iter iter: Option< as HashtableLike>::Iterator<'static>>, + flush_state: Option, end_iter: bool, } @@ -196,9 +198,17 @@ impl SerializeGroupByStream { None }; + let flush_state = + if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() { + Some(PayloadFlushState::default()) + } else { + None + }; + SerializeGroupByStream:: { iter, payload, + flush_state, method: method.clone(), end_iter: false, } @@ -244,16 +254,19 @@ impl Iterator for SerializeGroupByStream { Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) } SerializePayload::AggregatePayload(p) => { - let data_block = p.payload.group_by_flush_all().ok()?; + let state = self.flush_state.as_mut().unwrap(); + let block = p.payload.aggregate_flush(state).unwrap(); - self.end_iter = true; + if state.flush_page >= p.payload.pages.len() { + self.end_iter = true; + } - Some( - data_block.add_meta(Some(AggregateSerdeMeta::create_agg_payload( - p.bucket, - p.max_partition_count, - ))), - ) + match block { + Some(block) => Some(Ok(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), + ))?)), + None => None, + } } } } From 8b4047afad3512cceb67d26e7733c5a4a66094d4 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 20 Mar 2024 20:00:02 +0800 Subject: [PATCH 03/11] reduce serializer overhead --- .../aggregator/serde/transform_group_by_serializer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index bcfdc34722f7..5d7f40e1d76e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -262,9 +262,9 @@ impl Iterator for SerializeGroupByStream { } match block { - Some(block) => Some(Ok(block.add_meta(Some( + Some(block) => Some(block.add_meta(Some( AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))?)), + ))), None => None, } } From 41de91074dbe5b12aacae4b5330436364a98f489 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 20 Mar 2024 20:58:49 +0800 Subject: [PATCH 04/11] add limit optimize --- .../serde/transform_group_by_serializer.rs | 12 ++++----- .../aggregator/transform_aggregate_final.rs | 25 ++++++++++++++++--- .../aggregator/transform_group_by_final.rs | 25 ++++++++++++++++--- 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index 5d7f40e1d76e..25d2b11403da 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -261,12 +261,12 @@ impl Iterator for SerializeGroupByStream { self.end_iter = true; } - match block { - Some(block) => Some(block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))), - None => None, - } + block.map(|block| { + block.add_meta(Some(AggregateSerdeMeta::create_agg_payload( + p.bucket, + p.max_partition_count, + ))) + }) } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index edc2f27e90d4..8eb7cc21b983 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -46,6 +46,7 @@ pub struct TransformFinalAggregate { method: Method, params: Arc, flush_state: PayloadFlushState, + reach_limit: bool, } impl TransformFinalAggregate { @@ -62,6 +63,7 @@ impl TransformFinalAggregate { method, params, flush_state: PayloadFlushState::default(), + reach_limit: false, }, ))) } @@ -121,12 +123,24 @@ impl TransformFinalAggregate { if let Some(mut ht) = agg_hashtable { let mut blocks = vec![]; self.flush_state.clear(); + + let mut rows = 0; loop { if ht.merge_result(&mut self.flush_state)? { let mut cols = self.flush_state.take_aggregate_results(); cols.extend_from_slice(&self.flush_state.take_group_columns()); - + rows += cols[0].len(); blocks.push(DataBlock::new_from_columns(cols)); + + if rows >= self.params.limit.unwrap_or(usize::MAX) { + log::info!( + "reach limit optimization in flush agg hashtable, current {}, total {}", + rows, + ht.len(), + ); + self.reach_limit = true; + break; + } } else { break; } @@ -148,12 +162,15 @@ where Method: HashMethodBounds const NAME: &'static str = "TransformFinalAggregate"; fn transform(&mut self, meta: AggregateMeta) -> Result { + if self.reach_limit { + return Ok(self.params.empty_result_block()); + } + if self.params.enable_experimental_aggregate_hashtable { return self.transform_agg_hashtable(meta); } if let AggregateMeta::Partitioned { bucket, data } = meta { - let mut reach_limit = false; let arena = Arc::new(Bump::new()); let hashtable = self.method.create_hash_table::(arena)?; let _dropper = AggregateHashTableDropper::create(self.params.clone()); @@ -182,7 +199,7 @@ where Method: HashMethodBounds let mut current_len = hash_cell.hashtable.len(); unsafe { for key in keys_iter { - if reach_limit { + if self.reach_limit { let entry = hash_cell.hashtable.entry(key); if let Some(entry) = entry { let place = Into::::into(*entry.get()); @@ -202,7 +219,7 @@ where Method: HashMethodBounds if let Some(limit) = self.params.limit { current_len += 1; if current_len >= limit { - reach_limit = true; + self.reach_limit = true; } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index c60478c2cf3e..19223173c71f 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -40,6 +40,7 @@ pub struct TransformFinalGroupBy { method: Method, params: Arc, flush_state: PayloadFlushState, + reach_limit: bool, } impl TransformFinalGroupBy { @@ -56,6 +57,7 @@ impl TransformFinalGroupBy { method, params, flush_state: PayloadFlushState::default(), + reach_limit: false, }, ))) } @@ -114,11 +116,23 @@ impl TransformFinalGroupBy { if let Some(mut ht) = agg_hashtable { let mut blocks = vec![]; self.flush_state.clear(); + + let mut rows = 0; loop { if ht.merge_result(&mut self.flush_state)? { - blocks.push(DataBlock::new_from_columns( - self.flush_state.take_group_columns(), - )); + let cols = self.flush_state.take_group_columns(); + rows += cols[0].len(); + + if rows >= self.params.limit.unwrap_or(usize::MAX) { + log::info!( + "reach limit optimization in flush agg hashtable, current {}, total {}", + rows, + ht.len(), + ); + self.reach_limit = true; + break; + } + blocks.push(DataBlock::new_from_columns(cols)); } else { break; } @@ -140,6 +154,10 @@ where Method: HashMethodBounds const NAME: &'static str = "TransformFinalGroupBy"; fn transform(&mut self, meta: AggregateMeta) -> Result { + if self.reach_limit { + return Ok(self.params.empty_result_block()); + } + if self.params.enable_experimental_aggregate_hashtable { return self.transform_agg_hashtable(meta); } @@ -166,6 +184,7 @@ where Method: HashMethodBounds if let Some(limit) = self.params.limit { if hashtable.len() >= limit { + self.reach_limit = true; break 'merge_hashtable; } } From 8193c07e3cb36a34df1cc953520d4d5a5491251d Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 00:22:38 +0800 Subject: [PATCH 05/11] fix exchange bug --- ...transform_exchange_aggregate_serializer.rs | 45 ++++++++++++----- .../transform_exchange_group_by_serializer.rs | 50 +++++++++++++------ 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 464a18e3af4e..9c645e4c6c69 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -192,12 +192,23 @@ impl BlockMetaTransform &self.params, SerializePayload::::HashTablePayload(payload), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? - } - })); + + let mut stream_blocks = Vec::new(); + for data_block in stream { + stream_blocks.push(serialize_block( + bucket, + data_block?, + &self.ipc_fields, + &self.options, + )?); + } + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( + &stream_blocks, + )?)); + } } Some(AggregateMeta::AggregatePayload(p)) => { if index == self.local_pos { @@ -215,12 +226,22 @@ impl BlockMetaTransform &self.params, SerializePayload::::AggregatePayload(p), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? - } - })); + let mut stream_blocks = Vec::new(); + for data_block in stream { + stream_blocks.push(serialize_block( + bucket, + data_block?, + &self.ipc_fields, + &self.options, + )?); + } + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( + &stream_blocks, + )?)); + } } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 6190be88fb01..1c3ab57517f5 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -238,16 +238,27 @@ impl BlockMetaTransform } let bucket = payload.bucket; - let mut stream = SerializeGroupByStream::create( + let stream = SerializeGroupByStream::create( &self.method, SerializePayload::::HashTablePayload(payload), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? - } - })); + + let mut stream_blocks = Vec::new(); + for data_block in stream { + stream_blocks.push(serialize_block( + bucket, + data_block?, + &self.ipc_fields, + &self.options, + )?); + } + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( + &stream_blocks, + )?)); + } } Some(AggregateMeta::AggregatePayload(p)) => { if index == self.local_pos { @@ -258,16 +269,27 @@ impl BlockMetaTransform } let bucket = p.bucket; - let mut stream = SerializeGroupByStream::create( + let stream = SerializeGroupByStream::create( &self.method, SerializePayload::::AggregatePayload(p), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? - } - })); + + let mut stream_blocks = Vec::new(); + for data_block in stream { + stream_blocks.push(serialize_block( + bucket, + data_block?, + &self.ipc_fields, + &self.options, + )?); + } + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( + &stream_blocks, + )?)); + } } }; } From 1d38f0b1db31aef2baa678e7d7a9c6a75f714b78 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 03:09:36 +0800 Subject: [PATCH 06/11] fix exchange bug --- ...transform_exchange_aggregate_serializer.rs | 47 +++++++++---------- .../transform_exchange_group_by_serializer.rs | 44 ++++++++--------- 2 files changed, 41 insertions(+), 50 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 9c645e4c6c69..48850c776014 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -187,27 +187,24 @@ impl BlockMetaTransform } let bucket = payload.bucket; - let mut stream = SerializeAggregateStream::create( + let stream = SerializeAggregateStream::create( &self.method, &self.params, SerializePayload::::HashTablePayload(payload), ); + let stream_blocks = + stream.into_iter().map(|x| x).collect::>>()?; - let mut stream_blocks = Vec::new(); - for data_block in stream { - stream_blocks.push(serialize_block( - bucket, - data_block?, - &self.ipc_fields, - &self.options, - )?); - } if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( - &stream_blocks, - )?)); + let c = serialize_block( + bucket, + DataBlock::concat(&stream_blocks)?, + &self.ipc_fields, + &self.options, + )?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } } Some(AggregateMeta::AggregatePayload(p)) => { @@ -221,26 +218,24 @@ impl BlockMetaTransform } let bucket = p.bucket; - let mut stream = SerializeAggregateStream::create( + let stream = SerializeAggregateStream::create( &self.method, &self.params, SerializePayload::::AggregatePayload(p), ); - let mut stream_blocks = Vec::new(); - for data_block in stream { - stream_blocks.push(serialize_block( - bucket, - data_block?, - &self.ipc_fields, - &self.options, - )?); - } + let stream_blocks = + stream.into_iter().map(|x| x).collect::>>()?; + if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( - &stream_blocks, - )?)); + let c = serialize_block( + bucket, + DataBlock::concat(&stream_blocks)?, + &self.ipc_fields, + &self.options, + )?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } } }; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 1c3ab57517f5..24b86c19e187 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -243,21 +243,19 @@ impl BlockMetaTransform SerializePayload::::HashTablePayload(payload), ); - let mut stream_blocks = Vec::new(); - for data_block in stream { - stream_blocks.push(serialize_block( - bucket, - data_block?, - &self.ipc_fields, - &self.options, - )?); - } + let stream_blocks = + stream.into_iter().map(|x| x).collect::>>()?; + if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( - &stream_blocks, - )?)); + let c = serialize_block( + bucket, + DataBlock::concat(&stream_blocks)?, + &self.ipc_fields, + &self.options, + )?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } } Some(AggregateMeta::AggregatePayload(p)) => { @@ -274,21 +272,19 @@ impl BlockMetaTransform SerializePayload::::AggregatePayload(p), ); - let mut stream_blocks = Vec::new(); - for data_block in stream { - stream_blocks.push(serialize_block( - bucket, - data_block?, - &self.ipc_fields, - &self.options, - )?); - } + let stream_blocks = + stream.into_iter().map(|x| x).collect::>>()?; + if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::concat( - &stream_blocks, - )?)); + let c = serialize_block( + bucket, + DataBlock::concat(&stream_blocks)?, + &self.ipc_fields, + &self.options, + )?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); } } }; From 6ff37e78fd3fc6ef31969eefdb729624e70775b9 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 05:40:31 +0800 Subject: [PATCH 07/11] fix exchange bug --- src/common/metrics/src/metrics/transform.rs | 7 +++++ .../expression/src/aggregate/payload_flush.rs | 4 ++- .../serde/transform_aggregate_serializer.rs | 2 +- ...transform_exchange_aggregate_serializer.rs | 29 +++++++++--------- .../transform_exchange_group_by_serializer.rs | 30 ++++++++++--------- .../serde/transform_group_by_serializer.rs | 2 +- .../aggregator/transform_group_by_final.rs | 2 +- 7 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/common/metrics/src/metrics/transform.rs b/src/common/metrics/src/metrics/transform.rs index 12aacb5b59e3..6cb60448e283 100644 --- a/src/common/metrics/src/metrics/transform.rs +++ b/src/common/metrics/src/metrics/transform.rs @@ -26,6 +26,9 @@ pub static AGGREGATE_PARTIAL_SPILL_CELL_COUNT: LazyLock = LazyLock::new(|| register_counter("transform_aggregate_partial_spill_cell_count")); pub static AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock = LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_allocated_bytes")); +pub static AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS: LazyLock = + LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_exchange_rows")); + pub static SPILL_COUNT: LazyLock> = LazyLock::new(|| register_counter_family("transform_spill_count")); pub static SPILL_WRITE_COUNT: LazyLock> = @@ -70,6 +73,10 @@ pub fn metrics_inc_aggregate_partial_spill_cell_count(c: u64) { AGGREGATE_PARTIAL_SPILL_CELL_COUNT.inc_by(c); } +pub fn metrics_inc_aggregate_partial_hashtable_exchange_rows(c: u64) { + AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS.inc_by(c); +} + pub fn metrics_inc_aggregate_partial_hashtable_allocated_bytes(c: u64) { AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES.inc_by(c); } diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 97c156a1cd15..96ae4faf7ff6 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -116,9 +116,11 @@ impl Payload { let mut state = PayloadFlushState::default(); let mut blocks = vec![]; - while self.flush(&mut state) { + loop { if let Some(block) = self.aggregate_flush(&mut state)? { blocks.push(block); + } else { + break; } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 731873266fef..861973c8908e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -312,7 +312,7 @@ impl SerializeAggregateStream { let state = self.flush_state.as_mut().unwrap(); let block = p.payload.aggregate_flush(state)?; - if state.flush_page >= p.payload.pages.len() { + if block.is_none() { self.end_iter = true; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 48850c776014..183d2195bfe8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -192,18 +192,18 @@ impl BlockMetaTransform &self.params, SerializePayload::::HashTablePayload(payload), ); - let stream_blocks = + let mut stream_blocks = stream.into_iter().map(|x| x).collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - let c = serialize_block( - bucket, - DataBlock::concat(&stream_blocks)?, - &self.ipc_fields, - &self.options, - )?; + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); + } + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } @@ -223,18 +223,19 @@ impl BlockMetaTransform &self.params, SerializePayload::::AggregatePayload(p), ); - let stream_blocks = + let mut stream_blocks = stream.into_iter().map(|x| x).collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - let c = serialize_block( - bucket, - DataBlock::concat(&stream_blocks)?, - &self.ipc_fields, - &self.options, - )?; + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); + } + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 24b86c19e187..f9628365c66c 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -243,18 +243,19 @@ impl BlockMetaTransform SerializePayload::::HashTablePayload(payload), ); - let stream_blocks = + let mut stream_blocks = stream.into_iter().map(|x| x).collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - let c = serialize_block( - bucket, - DataBlock::concat(&stream_blocks)?, - &self.ipc_fields, - &self.options, - )?; + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); + } + + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } @@ -272,18 +273,19 @@ impl BlockMetaTransform SerializePayload::::AggregatePayload(p), ); - let stream_blocks = + let mut stream_blocks = stream.into_iter().map(|x| x).collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); } else { - let c = serialize_block( - bucket, - DataBlock::concat(&stream_blocks)?, - &self.ipc_fields, - &self.options, - )?; + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); + } + + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; serialized_blocks.push(FlightSerialized::DataBlock(c)); } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index 25d2b11403da..3295c5a0dd0e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -257,7 +257,7 @@ impl Iterator for SerializeGroupByStream { let state = self.flush_state.as_mut().unwrap(); let block = p.payload.aggregate_flush(state).unwrap(); - if state.flush_page >= p.payload.pages.len() { + if block.is_none() { self.end_iter = true; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 19223173c71f..c4b1a919c857 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -122,6 +122,7 @@ impl TransformFinalGroupBy { if ht.merge_result(&mut self.flush_state)? { let cols = self.flush_state.take_group_columns(); rows += cols[0].len(); + blocks.push(DataBlock::new_from_columns(cols)); if rows >= self.params.limit.unwrap_or(usize::MAX) { log::info!( @@ -132,7 +133,6 @@ impl TransformFinalGroupBy { self.reach_limit = true; break; } - blocks.push(DataBlock::new_from_columns(cols)); } else { break; } From 478a27b7ed1e8ab385c9c8636dcae98eced547a2 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 05:54:50 +0800 Subject: [PATCH 08/11] fix exchange bug --- src/query/expression/src/aggregate/payload_flush.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 96ae4faf7ff6..4bde3c2f6501 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -116,12 +116,8 @@ impl Payload { let mut state = PayloadFlushState::default(); let mut blocks = vec![]; - loop { - if let Some(block) = self.aggregate_flush(&mut state)? { - blocks.push(block); - } else { - break; - } + while let Some(block) = self.aggregate_flush(&mut state)? { + blocks.push(block); } if blocks.is_empty() { From 2943969bef8dd950978f67100e7aebb7d354f860 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 17:05:24 +0800 Subject: [PATCH 09/11] fix exchange bug --- .../serde/transform_exchange_aggregate_serializer.rs | 6 ++---- .../serde/transform_exchange_group_by_serializer.rs | 6 ++---- .../executor/physical_plans/physical_aggregate_partial.rs | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 183d2195bfe8..6388230a63a8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -192,8 +192,7 @@ impl BlockMetaTransform &self.params, SerializePayload::::HashTablePayload(payload), ); - let mut stream_blocks = - stream.into_iter().map(|x| x).collect::>>()?; + let mut stream_blocks = stream.into_iter().collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); @@ -223,8 +222,7 @@ impl BlockMetaTransform &self.params, SerializePayload::::AggregatePayload(p), ); - let mut stream_blocks = - stream.into_iter().map(|x| x).collect::>>()?; + let mut stream_blocks = stream.into_iter().collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index f9628365c66c..58ae962b51a9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -243,8 +243,7 @@ impl BlockMetaTransform SerializePayload::::HashTablePayload(payload), ); - let mut stream_blocks = - stream.into_iter().map(|x| x).collect::>>()?; + let mut stream_blocks = stream.into_iter().collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); @@ -273,8 +272,7 @@ impl BlockMetaTransform SerializePayload::::AggregatePayload(p), ); - let mut stream_blocks = - stream.into_iter().map(|x| x).collect::>>()?; + let mut stream_blocks = stream.into_iter().collect::>>()?; if stream_blocks.is_empty() { serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); diff --git a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs index 88d110102caf..91953f4a2555 100644 --- a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs +++ b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs @@ -63,7 +63,7 @@ impl AggregatePartial { }) .collect::>>()?; - for (idx, data_type) in group_types.iter().enumerate() { + for (idx, data_type) in self.group_by.iter().zip(group_types.iter()) { fields.push(DataField::new(&idx.to_string(), data_type.clone())); } return Ok(DataSchemaRefExt::create(fields)); From 01b4acd716c20943f9296feedf636a895579b0d8 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 17:44:13 +0800 Subject: [PATCH 10/11] update --- src/common/metrics/src/metrics/transform.rs | 3 +++ .../aggregator/transform_aggregate_partial.rs | 17 ++++++++++++++--- .../aggregator/transform_group_by_partial.rs | 18 +++++++++++++++--- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/common/metrics/src/metrics/transform.rs b/src/common/metrics/src/metrics/transform.rs index 6cb60448e283..9b069786b957 100644 --- a/src/common/metrics/src/metrics/transform.rs +++ b/src/common/metrics/src/metrics/transform.rs @@ -22,6 +22,9 @@ use crate::Family; use crate::Histogram; use crate::VecLabels; +pub static AGGREGATE_PARTIAL_CELL_COUNT: LazyLock = + LazyLock::new(|| register_counter("transform_aggregate_partial_cell_count")); + pub static AGGREGATE_PARTIAL_SPILL_CELL_COUNT: LazyLock = LazyLock::new(|| register_counter("transform_aggregate_partial_spill_cell_count")); pub static AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock = diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index dd45cd1d9f3e..7b27f5d76fec 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -443,9 +443,14 @@ impl AccumulatingTransform for TransformPartialAggrega }, HashTable::HashTable(v) => match v.hashtable.len() == 0 { true => vec![], - false => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_hashtable(-1, v), - )], + false => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + v.allocated_bytes() as u64 + ); + vec![DataBlock::empty_with_meta( + AggregateMeta::::create_hashtable(-1, v), + )] + } }, HashTable::PartitionedHashTable(v) => { info!( @@ -454,6 +459,8 @@ impl AccumulatingTransform for TransformPartialAggrega convert_byte_size(v.allocated_bytes() as f64) ); + metrics_inc_aggregate_partial_hashtable_allocated_bytes(v.allocated_bytes() as u64); + let cells = PartitionedHashTableDropper::split_cell(v); let mut blocks = Vec::with_capacity(cells.len()); for (bucket, cell) in cells.into_iter().enumerate() { @@ -467,6 +474,10 @@ impl AccumulatingTransform for TransformPartialAggrega blocks } HashTable::AggregateHashTable(hashtable) => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + hashtable.allocated_bytes() as u64, + ); + let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index 918b769f0616..150efac33a1e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -297,9 +297,14 @@ impl AccumulatingTransform for TransformPartialGroupBy }, HashTable::HashTable(cell) => match cell.hashtable.len() == 0 { true => vec![], - false => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_hashtable(-1, cell), - )], + false => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + cell.allocated_bytes() as u64 + ); + vec![DataBlock::empty_with_meta( + AggregateMeta::::create_hashtable(-1, cell), + )] + } }, HashTable::PartitionedHashTable(v) => { info!( @@ -307,6 +312,9 @@ impl AccumulatingTransform for TransformPartialGroupBy convert_number_size(v.len() as f64), convert_byte_size(v.allocated_bytes() as f64) ); + + metrics_inc_aggregate_partial_hashtable_allocated_bytes(v.allocated_bytes() as u64); + let _ = v.hashtable.unsize_key_size(); let cells = PartitionedHashTableDropper::split_cell(v); let mut blocks = Vec::with_capacity(cells.len()); @@ -321,6 +329,10 @@ impl AccumulatingTransform for TransformPartialGroupBy blocks } HashTable::AggregateHashTable(hashtable) => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + hashtable.allocated_bytes() as u64, + ); + let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { From 95e7cde7381179816ee46e8e471396bbc3b16bf8 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Thu, 21 Mar 2024 18:37:45 +0800 Subject: [PATCH 11/11] update --- .../aggregator/serde/transform_aggregate_serializer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index 861973c8908e..34094916b9e9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -286,9 +286,10 @@ impl SerializeAggregateStream { .method .keys_column_builder(max_block_rows, max_block_bytes); + let mut bytes = 0; + #[allow(clippy::while_let_on_iterator)] while let Some(group_entity) = self.iter.as_mut().and_then(|iter| iter.next()) { - let mut bytes = 0; let place = Into::::into(*group_entity.get()); for (idx, func) in funcs.iter().enumerate() { @@ -300,7 +301,7 @@ impl SerializeAggregateStream { group_key_builder.append_value(group_entity.key()); - if bytes >= 8 * 1024 * 1024 { + if bytes + group_key_builder.bytes_size() >= 8 * 1024 * 1024 { return self.finish(state_builders, group_key_builder); } }