Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(query): improve group #15031

Merged
merged 14 commits into from
Mar 21, 2024
7 changes: 7 additions & 0 deletions src/common/metrics/src/metrics/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub static AGGREGATE_PARTIAL_SPILL_CELL_COUNT: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_spill_cell_count"));
pub static AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_allocated_bytes"));
pub static AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_exchange_rows"));

pub static SPILL_COUNT: LazyLock<Family<VecLabels, Counter>> =
LazyLock::new(|| register_counter_family("transform_spill_count"));
pub static SPILL_WRITE_COUNT: LazyLock<Family<VecLabels, Counter>> =
Expand Down Expand Up @@ -70,6 +73,10 @@ pub fn metrics_inc_aggregate_partial_spill_cell_count(c: u64) {
AGGREGATE_PARTIAL_SPILL_CELL_COUNT.inc_by(c);
}

pub fn metrics_inc_aggregate_partial_hashtable_exchange_rows(c: u64) {
AGGREGATE_PARTIAL_HASHTABLE_EXCHANGE_ROWS.inc_by(c);
}

pub fn metrics_inc_aggregate_partial_hashtable_allocated_bytes(c: u64) {
AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES.inc_by(c);
}
Expand Down
22 changes: 14 additions & 8 deletions src/query/expression/src/aggregate/payload_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,18 @@ impl Payload {
let mut state = PayloadFlushState::default();
let mut blocks = vec![];

while self.flush(&mut state) {
while let Some(block) = self.aggregate_flush(&mut state)? {
blocks.push(block);
}

if blocks.is_empty() {
return Ok(self.empty_block());
}
DataBlock::concat(&blocks)
}

pub fn aggregate_flush(&self, state: &mut PayloadFlushState) -> Result<Option<DataBlock>> {
if self.flush(state) {
let row_count = state.row_count;

let mut state_builders: Vec<BinaryColumnBuilder> = self
Expand Down Expand Up @@ -146,15 +157,10 @@ impl Payload {
}

cols.extend_from_slice(&state.take_group_columns());

blocks.push(DataBlock::new_from_columns(cols));
}

if blocks.is_empty() {
return Ok(self.empty_block());
return Ok(Some(DataBlock::new_from_columns(cols)));
}

DataBlock::concat(&blocks)
Ok(None)
}

pub fn group_by_flush_all(&self) -> Result<DataBlock> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ impl SerializedPayload {
entry.value.as_column().unwrap()
}

pub fn convert_to_partitioned_payload(
pub fn convert_to_aggregate_table(
&self,
group_types: Vec<DataType>,
aggrs: Vec<Arc<dyn AggregateFunction>>,
radix_bits: u64,
arena: Arc<Bump>,
) -> Result<PartitionedPayload> {
) -> Result<AggregateHashTable> {
let rows_num = self.data_block.num_rows();
let config = HashTableConfig::default().with_initial_radix_bits(radix_bits);
let mut state = ProbeState::default();
Expand Down Expand Up @@ -93,7 +93,17 @@ impl SerializedPayload {
hashtable.add_groups(&mut state, &group_columns, &[vec![]], &agg_states, rows_num)?;

hashtable.payload.mark_min_cardinality();
Ok(hashtable)
}

pub fn convert_to_partitioned_payload(
&self,
group_types: Vec<DataType>,
aggrs: Vec<Arc<dyn AggregateFunction>>,
radix_bits: u64,
arena: Arc<Bump>,
) -> Result<PartitionedPayload> {
let hashtable = self.convert_to_aggregate_table(group_types, aggrs, radix_bits, arena)?;
Ok(hashtable.payload)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_expression::types::binary::BinaryColumnBuilder;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::PayloadFlushState;
use databend_common_functions::aggregates::StateAddr;
use databend_common_hashtable::HashtableEntryRefLike;
use databend_common_hashtable::HashtableLike;
Expand Down Expand Up @@ -209,6 +210,7 @@ pub struct SerializeAggregateStream<Method: HashMethodBounds> {
pub payload: Pin<Box<SerializePayload<Method, usize>>>,
// old hashtable' iter
iter: Option<<Method::HashTable<usize> as HashtableLike>::Iterator<'static>>,
flush_state: Option<PayloadFlushState>,
end_iter: bool,
}

Expand All @@ -231,10 +233,18 @@ impl<Method: HashMethodBounds> SerializeAggregateStream<Method> {
None
};

let flush_state =
if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() {
Some(PayloadFlushState::default())
} else {
None
};

SerializeAggregateStream::<Method> {
iter,
payload,
end_iter: false,
flush_state,
method: method.clone(),
params: params.clone(),
}
Expand Down Expand Up @@ -299,13 +309,19 @@ impl<Method: HashMethodBounds> SerializeAggregateStream<Method> {
self.finish(state_builders, group_key_builder)
}
SerializePayload::AggregatePayload(p) => {
let data_block = p.payload.aggregate_flush_all()?;
let state = self.flush_state.as_mut().unwrap();
let block = p.payload.aggregate_flush(state)?;

self.end_iter = true;
if block.is_none() {
self.end_iter = true;
}

Ok(Some(data_block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?))
match block {
Some(block) => Ok(Some(block.add_meta(Some(
AggregateSerdeMeta::create_agg_payload(p.bucket, p.max_partition_count),
))?)),
None => Ok(None),
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,24 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = payload.bucket;
let mut stream = SerializeAggregateStream::create(
let stream = SerializeAggregateStream::create(
&self.method,
&self.params,
SerializePayload::<Method, usize>::HashTablePayload(payload),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));
metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);
let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
Some(AggregateMeta::AggregatePayload(p)) => {
if index == self.local_pos {
Expand All @@ -210,17 +217,25 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = p.bucket;
let mut stream = SerializeAggregateStream::create(
let stream = SerializeAggregateStream::create(
&self.method,
&self.params,
SerializePayload::<Method, usize>::AggregatePayload(p),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?
let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));
metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);

let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,25 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = payload.bucket;
let mut stream = SerializeGroupByStream::create(
let stream = SerializeGroupByStream::create(
&self.method,
SerializePayload::<Method, ()>::HashTablePayload(payload),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?

let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));

metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);
let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
Some(AggregateMeta::AggregatePayload(p)) => {
if index == self.local_pos {
Expand All @@ -258,16 +267,25 @@ impl<Method: HashMethodBounds> BlockMetaTransform<ExchangeShuffleMeta>
}

let bucket = p.bucket;
let mut stream = SerializeGroupByStream::create(
let stream = SerializeGroupByStream::create(
&self.method,
SerializePayload::<Method, ()>::AggregatePayload(p),
);
serialized_blocks.push(FlightSerialized::DataBlock(match stream.next() {
None => DataBlock::empty(),
Some(data_block) => {
serialize_block(bucket, data_block?, &self.ipc_fields, &self.options)?

let mut stream_blocks = stream.into_iter().collect::<Result<Vec<_>>>()?;

if stream_blocks.is_empty() {
serialized_blocks.push(FlightSerialized::DataBlock(DataBlock::empty()));
} else {
let mut c = DataBlock::concat(&stream_blocks)?;
if let Some(meta) = stream_blocks[0].take_meta() {
c.replace_meta(meta);
}
}));

metrics_inc_aggregate_partial_hashtable_exchange_rows(c.num_rows() as u64);
let c = serialize_block(bucket, c, &self.ipc_fields, &self.options)?;
serialized_blocks.push(FlightSerialized::DataBlock(c));
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::PayloadFlushState;
use databend_common_hashtable::HashtableEntryRefLike;
use databend_common_hashtable::HashtableLike;
use databend_common_pipeline_core::processors::Event;
Expand Down Expand Up @@ -178,6 +179,7 @@ pub struct SerializeGroupByStream<Method: HashMethodBounds> {
pub payload: Pin<Box<SerializePayload<Method, ()>>>,
// old hashtable' iter
iter: Option<<Method::HashTable<()> as HashtableLike>::Iterator<'static>>,
flush_state: Option<PayloadFlushState>,
end_iter: bool,
}

Expand All @@ -196,9 +198,17 @@ impl<Method: HashMethodBounds> SerializeGroupByStream<Method> {
None
};

let flush_state =
if let SerializePayload::AggregatePayload(_) = payload.as_ref().get_ref() {
Some(PayloadFlushState::default())
} else {
None
};

SerializeGroupByStream::<Method> {
iter,
payload,
flush_state,
method: method.clone(),
end_iter: false,
}
Expand Down Expand Up @@ -244,16 +254,19 @@ impl<Method: HashMethodBounds> Iterator for SerializeGroupByStream<Method> {
Some(data_block.add_meta(Some(AggregateSerdeMeta::create(bucket))))
}
SerializePayload::AggregatePayload(p) => {
let data_block = p.payload.group_by_flush_all().ok()?;
let state = self.flush_state.as_mut().unwrap();
let block = p.payload.aggregate_flush(state).unwrap();

self.end_iter = true;
if block.is_none() {
self.end_iter = true;
}

Some(
data_block.add_meta(Some(AggregateSerdeMeta::create_agg_payload(
block.map(|block| {
block.add_meta(Some(AggregateSerdeMeta::create_agg_payload(
p.bucket,
p.max_partition_count,
))),
)
)))
})
}
}
}
Expand Down
Loading
Loading