diff --git a/src/common/metrics/src/metrics/transform.rs b/src/common/metrics/src/metrics/transform.rs index 12aacb5b59e3..9b069786b957 100644 --- a/src/common/metrics/src/metrics/transform.rs +++ b/src/common/metrics/src/metrics/transform.rs @@ -22,10 +22,16 @@ use crate::Family; use crate::Histogram; use crate::VecLabels; +pub static AGGREGATE_PARTIAL_CELL_COUNT: LazyLock = + LazyLock::new(|| register_counter("transform_aggregate_partial_cell_count")); + pub static AGGREGATE_PARTIAL_SPILL_CELL_COUNT: LazyLock = LazyLock::new(|| register_counter("transform_aggregate_partial_spill_cell_count")); pub static AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock = LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_allocated_bytes")); +pub static AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS: LazyLock = + LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_exchange_rows")); + pub static SPILL_COUNT: LazyLock> = LazyLock::new(|| register_counter_family("transform_spill_count")); pub static SPILL_WRITE_COUNT: LazyLock> = @@ -70,6 +76,10 @@ pub fn metrics_inc_aggregate_partial_spill_cell_count(c: u64) { AGGREGATE_PARTIAL_SPILL_CELL_COUNT.inc_by(c); } +pub fn metrics_inc_aggregate_partial_hashtable_exchange_rows(c: u64) { + AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS.inc_by(c); +} + pub fn metrics_inc_aggregate_partial_hashtable_allocated_bytes(c: u64) { AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES.inc_by(c); } diff --git a/src/query/expression/src/aggregate/payload_flush.rs b/src/query/expression/src/aggregate/payload_flush.rs index 8a7c4099dca9..4bde3c2f6501 100644 --- a/src/query/expression/src/aggregate/payload_flush.rs +++ b/src/query/expression/src/aggregate/payload_flush.rs @@ -116,7 +116,18 @@ impl Payload { let mut state = PayloadFlushState::default(); let mut blocks = vec![]; - while self.flush(&mut state) { + while let Some(block) = self.aggregate_flush(&mut state)? { + blocks.push(block); + } + + if blocks.is_empty() { + return Ok(self.empty_block()); + } + DataBlock::concat(&blocks) + } + + pub fn aggregate_flush(&self, state: &mut PayloadFlushState) -> Result> { + if self.flush(state) { let row_count = state.row_count; let mut state_builders: Vec = self @@ -146,15 +157,10 @@ impl Payload { } cols.extend_from_slice(&state.take_group_columns()); - - blocks.push(DataBlock::new_from_columns(cols)); - } - - if blocks.is_empty() { - return Ok(self.empty_block()); + return Ok(Some(DataBlock::new_from_columns(cols))); } - DataBlock::concat(&blocks) + Ok(None) } pub fn group_by_flush_all(&self) -> Result { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs index a18ef93232d9..d4d46fab2bfe 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs @@ -53,13 +53,13 @@ impl SerializedPayload { entry.value.as_column().unwrap() } - pub fn convert_to_partitioned_payload( + pub fn convert_to_aggregate_table( &self, group_types: Vec, aggrs: Vec>, radix_bits: u64, arena: Arc, - ) -> Result { + ) -> Result { let rows_num = self.data_block.num_rows(); let config = HashTableConfig::default().with_initial_radix_bits(radix_bits); let mut state = ProbeState::default(); @@ -93,7 +93,17 @@ impl SerializedPayload { hashtable.add_groups(&mut state, &group_columns, &[vec![]], &agg_states, rows_num)?; hashtable.payload.mark_min_cardinality(); + Ok(hashtable) + } + pub fn convert_to_partitioned_payload( + &self, + group_types: Vec, + aggrs: Vec>, + radix_bits: u64, + arena: Arc, + ) -> Result { + let hashtable = self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena)?; Ok(hashtable.payload) } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs index eb9a50b092cf..34094916b9e9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_serializer.rs @@ -22,6 +22,7 @@ use databend_common_expression::types::binary::BinaryColumnBuilder; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::PayloadFlushState; use databend_common_functions::aggregates::StateAddr; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; @@ -209,6 +210,7 @@ pub struct SerializeAggregateStream { pub payload: Pin>>, // old hashtable' iter iter: Option< as HashtableLike>::Iterator<'static>>, + flush_state: Option, end_iter: bool, } @@ -231,10 +233,18 @@ impl SerializeAggregateStream { None }; + let flush_state = + if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() { + Some(PayloadFlushState::default()) + } else { + None + }; + SerializeAggregateStream:: { iter, payload, end_iter: false, + flush_state, method: method.clone(), params: params.clone(), } @@ -276,9 +286,10 @@ impl SerializeAggregateStream { .method .keys_column_builder(max_block_rows, max_block_bytes); + let mut bytes = 0; + #[allow(clippy::while_let_on_iterator)] while let Some(group_entity) = self.iter.as_mut().and_then(|iter| iter.next()) { - let mut bytes = 0; let place = Into::::into(*group_entity.get()); for (idx, func) in funcs.iter().enumerate() { @@ -290,7 +301,7 @@ impl SerializeAggregateStream { group_key_builder.append_value(group_entity.key()); - if bytes >= 8 * 1024 * 1024 { + if bytes + group_key_builder.bytes_size() >= 8 * 1024 * 1024 { return self.finish(state_builders, group_key_builder); } } @@ -299,13 +310,19 @@ impl SerializeAggregateStream { self.finish(state_builders, group_key_builder) } SerializePayload::AggregatePayload(p) => { - let data_block = p.payload.aggregate_flush_all()?; + let state = self.flush_state.as_mut().unwrap(); + let block = p.payload.aggregate_flush(state)?; - self.end_iter = true; + if block.is_none() { + self.end_iter = true; + } - Ok(Some(data_block.add_meta(Some( - AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), - ))?)) + match block { + Some(block) => Ok(Some(block.add_meta(Some( + AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count), + ))?)), + None => Ok(None), + } } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index 464a18e3af4e..6388230a63a8 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -187,17 +187,24 @@ impl BlockMetaTransform } let bucket = payload.bucket; - let mut stream = SerializeAggregateStream::create( + let stream = SerializeAggregateStream::create( &self.method, &self.params, SerializePayload::::HashTablePayload(payload), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + let mut stream_blocks = stream.into_iter().collect::>>()?; + + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } - })); + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); + } } Some(AggregateMeta::AggregatePayload(p)) => { if index == self.local_pos { @@ -210,17 +217,25 @@ impl BlockMetaTransform } let bucket = p.bucket; - let mut stream = SerializeAggregateStream::create( + let stream = SerializeAggregateStream::create( &self.method, &self.params, SerializePayload::::AggregatePayload(p), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + let mut stream_blocks = stream.into_iter().collect::>>()?; + + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } - })); + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); + } } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index 6190be88fb01..58ae962b51a9 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -238,16 +238,25 @@ impl BlockMetaTransform } let bucket = payload.bucket; - let mut stream = SerializeGroupByStream::create( + let stream = SerializeGroupByStream::create( &self.method, SerializePayload::::HashTablePayload(payload), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + + let mut stream_blocks = stream.into_iter().collect::>>()?; + + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } - })); + + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); + } } Some(AggregateMeta::AggregatePayload(p)) => { if index == self.local_pos { @@ -258,16 +267,25 @@ impl BlockMetaTransform } let bucket = p.bucket; - let mut stream = SerializeGroupByStream::create( + let stream = SerializeGroupByStream::create( &self.method, SerializePayload::::AggregatePayload(p), ); - serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() { - None => DataBlock::empty(), - Some(data_block) => { - serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)? + + let mut stream_blocks = stream.into_iter().collect::>>()?; + + if stream_blocks.is_empty() { + serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty())); + } else { + let mut c = DataBlock::concat(&stream_blocks)?; + if let Some(meta) = stream_blocks[0].take_meta() { + c.replace_meta(meta); } - })); + + metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64); + let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?; + serialized_blocks.push(FlightSerialized::DataBlock(c)); + } } }; } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs index 7d952d7cf1d8..3295c5a0dd0e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_group_by_serializer.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; +use databend_common_expression::PayloadFlushState; use databend_common_hashtable::HashtableEntryRefLike; use databend_common_hashtable::HashtableLike; use databend_common_pipeline_core::processors::Event; @@ -178,6 +179,7 @@ pub struct SerializeGroupByStream { pub payload: Pin>>, // old hashtable' iter iter: Option< as HashtableLike>::Iterator<'static>>, + flush_state: Option, end_iter: bool, } @@ -196,9 +198,17 @@ impl SerializeGroupByStream { None }; + let flush_state = + if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() { + Some(PayloadFlushState::default()) + } else { + None + }; + SerializeGroupByStream:: { iter, payload, + flush_state, method: method.clone(), end_iter: false, } @@ -244,16 +254,19 @@ impl Iterator for SerializeGroupByStream { Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket)))) } SerializePayload::AggregatePayload(p) => { - let data_block = p.payload.group_by_flush_all().ok()?; + let state = self.flush_state.as_mut().unwrap(); + let block = p.payload.aggregate_flush(state).unwrap(); - self.end_iter = true; + if block.is_none() { + self.end_iter = true; + } - Some( - data_block.add_meta(Some(AggregateSerdeMeta::create_agg_payload( + block.map(|block| { + block.add_meta(Some(AggregateSerdeMeta::create_agg_payload( p.bucket, p.max_partition_count, - ))), - ) + ))) + }) } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs index f20aa83d6083..8eb7cc21b983 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_final.rs @@ -46,6 +46,7 @@ pub struct TransformFinalAggregate { method: Method, params: Arc, flush_state: PayloadFlushState, + reach_limit: bool, } impl TransformFinalAggregate { @@ -62,6 +63,7 @@ impl TransformFinalAggregate { method, params, flush_state: PayloadFlushState::default(), + reach_limit: false, }, ))) } @@ -105,23 +107,12 @@ impl TransformFinalAggregate { None => { debug_assert!(bucket == payload.bucket); let arena = Arc::new(Bump::new()); - let payload = payload.convert_to_partitioned_payload( + agg_hashtable = Some(payload.convert_to_aggregate_table( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), 0, arena, - )?; - let capacity = - AggregateHashTable::get_capacity_for_count(payload.len()); - let mut hashtable = AggregateHashTable::new_with_capacity( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - HashTableConfig::default().with_initial_radix_bits(0), - capacity, - Arc::new(Bump::new()), - ); - hashtable.combine_payloads(&payload, &mut self.flush_state)?; - agg_hashtable = Some(hashtable); + )?); } }, _ => unreachable!(), @@ -132,12 +123,24 @@ impl TransformFinalAggregate { if let Some(mut ht) = agg_hashtable { let mut blocks = vec![]; self.flush_state.clear(); + + let mut rows = 0; loop { if ht.merge_result(&mut self.flush_state)? { let mut cols = self.flush_state.take_aggregate_results(); cols.extend_from_slice(&self.flush_state.take_group_columns()); - + rows += cols[0].len(); blocks.push(DataBlock::new_from_columns(cols)); + + if rows >= self.params.limit.unwrap_or(usize::MAX) { + log::info!( + "reach limit optimization in flush agg hashtable, current {}, total {}", + rows, + ht.len(), + ); + self.reach_limit = true; + break; + } } else { break; } @@ -159,12 +162,15 @@ where Method: HashMethodBounds const NAME: &'static str = "TransformFinalAggregate"; fn transform(&mut self, meta: AggregateMeta) -> Result { + if self.reach_limit { + return Ok(self.params.empty_result_block()); + } + if self.params.enable_experimental_aggregate_hashtable { return self.transform_agg_hashtable(meta); } if let AggregateMeta::Partitioned { bucket, data } = meta { - let mut reach_limit = false; let arena = Arc::new(Bump::new()); let hashtable = self.method.create_hash_table::(arena)?; let _dropper = AggregateHashTableDropper::create(self.params.clone()); @@ -193,7 +199,7 @@ where Method: HashMethodBounds let mut current_len = hash_cell.hashtable.len(); unsafe { for key in keys_iter { - if reach_limit { + if self.reach_limit { let entry = hash_cell.hashtable.entry(key); if let Some(entry) = entry { let place = Into::::into(*entry.get()); @@ -213,7 +219,7 @@ where Method: HashMethodBounds if let Some(limit) = self.params.limit { current_len += 1; if current_len >= limit { - reach_limit = true; + self.reach_limit = true; } } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs index dd45cd1d9f3e..7b27f5d76fec 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_aggregate_partial.rs @@ -443,9 +443,14 @@ impl AccumulatingTransform for TransformPartialAggrega }, HashTable::HashTable(v) => match v.hashtable.len() == 0 { true => vec![], - false => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_hashtable(-1, v), - )], + false => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + v.allocated_bytes() as u64 + ); + vec![DataBlock::empty_with_meta( + AggregateMeta::::create_hashtable(-1, v), + )] + } }, HashTable::PartitionedHashTable(v) => { info!( @@ -454,6 +459,8 @@ impl AccumulatingTransform for TransformPartialAggrega convert_byte_size(v.allocated_bytes() as f64) ); + metrics_inc_aggregate_partial_hashtable_allocated_bytes(v.allocated_bytes() as u64); + let cells = PartitionedHashTableDropper::split_cell(v); let mut blocks = Vec::with_capacity(cells.len()); for (bucket, cell) in cells.into_iter().enumerate() { @@ -467,6 +474,10 @@ impl AccumulatingTransform for TransformPartialAggrega blocks } HashTable::AggregateHashTable(hashtable) => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + hashtable.allocated_bytes() as u64, + ); + let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs index 5ae69b6f85ee..c4b1a919c857 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_final.rs @@ -40,6 +40,7 @@ pub struct TransformFinalGroupBy { method: Method, params: Arc, flush_state: PayloadFlushState, + reach_limit: bool, } impl TransformFinalGroupBy { @@ -56,6 +57,7 @@ impl TransformFinalGroupBy { method, params, flush_state: PayloadFlushState::default(), + reach_limit: false, }, ))) } @@ -98,23 +100,12 @@ impl TransformFinalGroupBy { None => { debug_assert!(bucket == payload.bucket); let arena = Arc::new(Bump::new()); - let payload = payload.convert_to_partitioned_payload( + agg_hashtable = Some(payload.convert_to_aggregate_table( self.params.group_data_types.clone(), self.params.aggregate_functions.clone(), 0, arena, - )?; - let capacity = - AggregateHashTable::get_capacity_for_count(payload.len()); - let mut hashtable = AggregateHashTable::new_with_capacity( - self.params.group_data_types.clone(), - self.params.aggregate_functions.clone(), - HashTableConfig::default().with_initial_radix_bits(0), - capacity, - Arc::new(Bump::new()), - ); - hashtable.combine_payloads(&payload, &mut self.flush_state)?; - agg_hashtable = Some(hashtable); + )?); } }, _ => unreachable!(), @@ -125,11 +116,23 @@ impl TransformFinalGroupBy { if let Some(mut ht) = agg_hashtable { let mut blocks = vec![]; self.flush_state.clear(); + + let mut rows = 0; loop { if ht.merge_result(&mut self.flush_state)? { - blocks.push(DataBlock::new_from_columns( - self.flush_state.take_group_columns(), - )); + let cols = self.flush_state.take_group_columns(); + rows += cols[0].len(); + blocks.push(DataBlock::new_from_columns(cols)); + + if rows >= self.params.limit.unwrap_or(usize::MAX) { + log::info!( + "reach limit optimization in flush agg hashtable, current {}, total {}", + rows, + ht.len(), + ); + self.reach_limit = true; + break; + } } else { break; } @@ -151,6 +154,10 @@ where Method: HashMethodBounds const NAME: &'static str = "TransformFinalGroupBy"; fn transform(&mut self, meta: AggregateMeta) -> Result { + if self.reach_limit { + return Ok(self.params.empty_result_block()); + } + if self.params.enable_experimental_aggregate_hashtable { return self.transform_agg_hashtable(meta); } @@ -177,6 +184,7 @@ where Method: HashMethodBounds if let Some(limit) = self.params.limit { if hashtable.len() >= limit { + self.reach_limit = true; break 'merge_hashtable; } } diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs index 918b769f0616..150efac33a1e 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/transform_group_by_partial.rs @@ -297,9 +297,14 @@ impl AccumulatingTransform for TransformPartialGroupBy }, HashTable::HashTable(cell) => match cell.hashtable.len() == 0 { true => vec![], - false => vec![DataBlock::empty_with_meta( - AggregateMeta::::create_hashtable(-1, cell), - )], + false => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + cell.allocated_bytes() as u64 + ); + vec![DataBlock::empty_with_meta( + AggregateMeta::::create_hashtable(-1, cell), + )] + } }, HashTable::PartitionedHashTable(v) => { info!( @@ -307,6 +312,9 @@ impl AccumulatingTransform for TransformPartialGroupBy convert_number_size(v.len() as f64), convert_byte_size(v.allocated_bytes() as f64) ); + + metrics_inc_aggregate_partial_hashtable_allocated_bytes(v.allocated_bytes() as u64); + let _ = v.hashtable.unsize_key_size(); let cells = PartitionedHashTableDropper::split_cell(v); let mut blocks = Vec::with_capacity(cells.len()); @@ -321,6 +329,10 @@ impl AccumulatingTransform for TransformPartialGroupBy blocks } HashTable::AggregateHashTable(hashtable) => { + metrics_inc_aggregate_partial_hashtable_allocated_bytes( + hashtable.allocated_bytes() as u64, + ); + let partition_count = hashtable.payload.partition_count(); let mut blocks = Vec::with_capacity(partition_count); for (bucket, payload) in hashtable.payload.payloads.into_iter().enumerate() { diff --git a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs index 47509cd11e8b..e6e1dca9cd4e 100644 --- a/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs +++ b/src/query/sql/src/executor/physical_plans/physical_aggregate_partial.rs @@ -63,7 +63,7 @@ impl AggregatePartial { }) .collect::>>()?; - for (idx, data_type) in group_types.iter().enumerate() { + for (idx, data_type) in self.group_by.iter().zip(group_types.iter()) { fields.push(DataField::new(&idx.to_string(), data_type.clone())); } return Ok(DataSchemaRefExt::create(fields));