Skip to content

Commit

Permalink
chore(query): improve group (#15031)
Browse files Browse the repository at this point in the history
* chore: improve group

* reduce serializer overhead

* reduce serializer overhead

* add limit optimize

* fix exchange bug

* fix exchange bug

* fix exchange bug

* fix exchange bug

* fix exchange bug

* update

* update
  • Loading branch information
sundy-li authored Mar 21, 2024
1 parent 341fb34 commit bcee197
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 87 deletions.
10 changes: 10 additions & 0 deletions src/common/metrics/src/metrics/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ use crate::Family;
use crate::Histogram;
use crate::VecLabels;

pub static AGGREGATE_PARTIAL_CELL_COUNT: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_cell_count"));

pub static AGGREGATE_PARTIAL_SPILL_CELL_COUNT: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_spill_cell_count"));
pub static AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_allocated_bytes"));
pub static AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_exchange_rows"));

pub static SPILL_COUNT: LazyLock<Family<VecLabels, Counter>> =
LazyLock::new(|| register_counter_family("transform_spill_count"));
pub static SPILL_WRITE_COUNT: LazyLock<Family<VecLabels, Counter>> =
Expand Down Expand Up @@ -70,6 +76,10 @@ pub fn metrics_inc_aggregate_partial_spill_cell_count(c: u64) {
AGGREGATE_PARTIAL_SPILL_CELL_COUNT.inc_by(c);
}

pub fn metrics_inc_aggregate_partial_hashtable_exchange_rows(c: u64) {
AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS.inc_by(c);
}

pub fn metrics_inc_aggregate_partial_hashtable_allocated_bytes(c: u64) {
AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES.inc_by(c);
}
Expand Down
22 changes: 14 additions & 8 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,18 @@ impl Payload {
let mut state = PayloadFlushState::default();
let mut blocks = vec![];

while self.flush(&mut state) {
while let Some(block) = self.aggregate_flush(&mut state)? {
blocks.push(block);
}

if blocks.is_empty() {
return Ok(self.empty_block());
}
DataBlock::concat(&blocks)
}

pub fn aggregate_flush(&self, state: &mut PayloadFlushState) -> Result<Option<DataBlock>> {
if self.flush(state) {
let row_count = state.row_count;

let mut state_builders: Vec<BinaryColumnBuilder> = self
Expand Down Expand Up @@ -146,15 +157,10 @@ impl Payload {
}

cols.extend_from_slice(&state.take_group_columns());

blocks.push(DataBlock::new_from_columns(cols));
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(Some(DataBlock::new_from_columns(cols)));
}

DataBlock::concat(&blocks)
Ok(None)
}

pub fn group_by_flush_all(&self) -> Result<DataBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl SerializedPayload {
entry.value.as_column().unwrap()
}

pub fn convert_to_partitioned_payload(
pub fn convert_to_aggregate_table(
&self,
group_types: Vec<DataType>,
aggrs: Vec<Arc<dyn AggregateFunction>>,
radix_bits: u64,
arena: Arc<Bump>,
) -> Result<PartitionedPayload> {
) -> Result<AggregateHashTable> {
let rows_num = self.data_block.num_rows();
let config = HashTableConfig::default().with_initial_radix_bits(radix_bits);
let mut state = ProbeState::default();
Expand Down Expand Up @@ -93,7 +93,17 @@ impl SerializedPayload {
hashtable.add_groups(&mut state, &group_columns, &[vec![]], &agg_states, rows_num)?;

hashtable.payload.mark_min_cardinality();
Ok(hashtable)
}

pub fn convert_to_partitioned_payload(
&self,
group_types: Vec<DataType>,
aggrs: Vec<Arc<dyn AggregateFunction>>,
radix_bits: u64,
arena: Arc<Bump>,
) -> Result<PartitionedPayload> {
let hashtable = self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena)?;
Ok(hashtable.payload)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_expression::types::binary::BinaryColumnBuilder;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::PayloadFlushState;
use databend_common_functions::aggregates::StateAddr;
use databend_common_hashtable::HashtableEntryRefLike;
use databend_common_hashtable::HashtableLike;
Expand Down Expand Up @@ -209,6 +210,7 @@ pub struct SerializeAggregateStream<Method: HashMethodBounds> {
pub payload: Pin<Box<SerializePayload<Method, usize>>>,
// old hashtable' iter
iter: Option<<Method::HashTable<usize> as HashtableLike>::Iterator<'static>>,
flush_state: Option<PayloadFlushState>,
end_iter: bool,
}

Expand All @@ -231,10 +233,18 @@ impl<Method: HashMethodBounds> SerializeAggregateStream<Method> {
None
};

let flush_state =
if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() {
Some(PayloadFlushState::default())
} else {
None
};

SerializeAggregateStream::<Method> {
iter,
payload,
end_iter: false,
flush_state,
method: method.clone(),
params: params.clone(),
}
Expand Down Expand Up @@ -276,9 +286,10 @@ impl<Method: HashMethodBounds> SerializeAggregateStream<Method> {
.method
.keys_column_builder(max_block_rows, max_block_bytes);

let mut bytes = 0;

#[allow(clippy::while_let_on_iterator)]
while let Some(group_entity) = self.iter.as_mut().and_then(|iter| iter.next()) {
let mut bytes = 0;
let place = Into::<StateAddr>::into(*group_entity.get());

for (idx, func) in funcs.iter().enumerate() {
Expand All @@ -290,7 +301,7 @@ impl<Method: HashMethodBounds> SerializeAggregateStream<Method> {

group_key_builder.append_value(group_entity.key());

if bytes >= 8 * 1024 * 1024 {
if bytes + group_key_builder.bytes_size() >= 8 * 1024 * 1024 {
return self.finish(state_builders, group_key_builder);
}
}
Expand All @@ -299,13 +310,19 @@ impl<Method: HashMethodBounds> SerializeAggregateStream<Method> {
self.finish(state_builders, group_key_builder)
}
SerializePayload::AggregatePayload(p) => {
let data_block = p.payload.aggregate_flush_all()?;
let state = self.flush_state.as_mut().unwrap();
let block = p.payload.aggregate_flush(state)?;

self.end_iter = true;
if block.is_none() {
self.end_iter = true;
}

Ok(Some(data_block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?))
match block {
Some(block) => Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?)),
None => Ok(None),
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,24 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = payload.bucket;
let mut stream = SerializeAggregateStream::create(
let stream = SerializeAggregateStream::create(
&self.method,
&self.params,
SerializePayload::<Method, usize>::HashTablePayload(payload),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));
metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);
let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
Some(AggregateMeta::AggregatePayload(p)) => {
if index == self.local_pos {
Expand All @@ -210,17 +217,25 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = p.bucket;
let mut stream = SerializeAggregateStream::create(
let stream = SerializeAggregateStream::create(
&self.method,
&self.params,
SerializePayload::<Method, usize>::AggregatePayload(p),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));
metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);

let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,25 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = payload.bucket;
let mut stream = SerializeGroupByStream::create(
let stream = SerializeGroupByStream::create(
&self.method,
SerializePayload::<Method, ()>::HashTablePayload(payload),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?

let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));

metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);
let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
Some(AggregateMeta::AggregatePayload(p)) => {
if index == self.local_pos {
Expand All @@ -258,16 +267,25 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = p.bucket;
let mut stream = SerializeGroupByStream::create(
let stream = SerializeGroupByStream::create(
&self.method,
SerializePayload::<Method, ()>::AggregatePayload(p),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?

let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));

metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);
let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::PayloadFlushState;
use databend_common_hashtable::HashtableEntryRefLike;
use databend_common_hashtable::HashtableLike;
use databend_common_pipeline_core::processors::Event;
Expand Down Expand Up @@ -178,6 +179,7 @@ pub struct SerializeGroupByStream<Method: HashMethodBounds> {
pub payload: Pin<Box<SerializePayload<Method, ()>>>,
// old hashtable' iter
iter: Option<<Method::HashTable<()> as HashtableLike>::Iterator<'static>>,
flush_state: Option<PayloadFlushState>,
end_iter: bool,
}

Expand All @@ -196,9 +198,17 @@ impl<Method: HashMethodBounds> SerializeGroupByStream<Method> {
None
};

let flush_state =
if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() {
Some(PayloadFlushState::default())
} else {
None
};

SerializeGroupByStream::<Method> {
iter,
payload,
flush_state,
method: method.clone(),
end_iter: false,
}
Expand Down Expand Up @@ -244,16 +254,19 @@ impl<Method: HashMethodBounds> Iterator for SerializeGroupByStream<Method> {
Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket))))
}
SerializePayload::AggregatePayload(p) => {
let data_block = p.payload.group_by_flush_all().ok()?;
let state = self.flush_state.as_mut().unwrap();
let block = p.payload.aggregate_flush(state).unwrap();

self.end_iter = true;
if block.is_none() {
self.end_iter = true;
}

Some(
data_block.add_meta(Some(AggregateSerdeMeta::create_agg_payload(
block.map(|block| {
block.add_meta(Some(AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
))),
)
)))
})
}
}
}
Expand Down
Loading

0 comments on commit bcee197

Please sign in to comment.