Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix incorrect agg spill in new agg hashtable #14995

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
397 changes: 396 additions & 1 deletion Cargo.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/common/metrics/src/metrics/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub static AGGREGATE_PARTIAL_SPILL_CELL_COUNT: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_spill_cell_count"));
pub static AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_aggregate_partial_hashtable_allocated_bytes"));
pub static GROUP_BY_PARTIAL_SPILL_CELL_COUNT: LazyLock<Counter> =
Freejww marked this conversation as resolved.
Show resolved Hide resolved
LazyLock::new(|| register_counter("transform_group_by_partial_spill_cell_count"));
pub static GROUP_BY_PARTIAL_HASHTABLE_ALLOCATED_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("transform_group_by_partial_hashtable_allocated_bytes"));
pub static SPILL_COUNT: LazyLock<Family<VecLabels, Counter>> =
LazyLock::new(|| register_counter_family("transform_spill_count"));
pub static SPILL_WRITE_COUNT: LazyLock<Family<VecLabels, Counter>> =
Expand Down Expand Up @@ -74,6 +78,19 @@ pub fn metrics_inc_aggregate_partial_hashtable_allocated_bytes(c: u64) {
AGGREGATE_PARTIAL_HASHTABLE_ALLOCATED_BYTES.inc_by(c);
}

pub fn metrics_inc_group_by_partial_spill_count() {
let labels = &vec![("spill", "group_by_partial_spill".to_string())];
SPILL_COUNT.get_or_create(labels).inc();
}

pub fn metrics_inc_group_by_partial_spill_cell_count(c: u64) {
GROUP_BY_PARTIAL_SPILL_CELL_COUNT.inc_by(c);
}

pub fn metrics_inc_group_by_partial_hashtable_allocated_bytes(c: u64) {
GROUP_BY_PARTIAL_HASHTABLE_ALLOCATED_BYTES.inc_by(c);
}

pub fn metrics_inc_group_by_spill_write_count() {
let labels = &vec![("spill", "group_by_spill".to_string())];
SPILL_WRITE_COUNT.get_or_create(labels).inc();
Expand Down
1 change: 1 addition & 0 deletions src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl AggregateHashTable {
state.empty_vector[idx] = idx;
}
self.payload.append_rows(state, row_count, group_columns);
self.payload.mark_min_cardinality();
row_count
} else {
self.probe_and_create(state, group_columns, row_count)
Expand Down
22 changes: 22 additions & 0 deletions src/query/expression/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod payload_row;
mod probe_state;

use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

pub use aggregate_function::*;
Expand Down Expand Up @@ -108,4 +109,25 @@ impl HashTableConfig {

self
}

pub fn update_current_max_radix_bits(self) -> Self {
Freejww marked this conversation as resolved.
Show resolved Hide resolved
loop {
let current_max_radix_bits = self.current_max_radix_bits.load(Ordering::SeqCst);
if current_max_radix_bits < self.max_radix_bits
&& self
.current_max_radix_bits
.compare_exchange(
current_max_radix_bits,
self.max_radix_bits,
Ordering::SeqCst,
Ordering::SeqCst,
)
.is_err()
{
continue;
}
break;
}
self
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,16 @@ impl SerializedPayload {
&self,
group_types: Vec<DataType>,
aggrs: Vec<Arc<dyn AggregateFunction>>,
radix_bits: u64,
arena: Arc<Bump>,
) -> Result<PartitionedPayload> {
let rows_num = self.data_block.num_rows();
let radix_bits = self.max_partition_count.trailing_zeros() as u64;
let config = HashTableConfig::default().with_initial_radix_bits(radix_bits);
let mut state = ProbeState::default();
let agg_len = aggrs.len();
let group_len = group_types.len();
let mut hashtable = AggregateHashTable::new_directly(
group_types,
aggrs,
config,
rows_num,
Arc::new(Bump::new()),
);
let mut hashtable =
AggregateHashTable::new_directly(group_types, aggrs, config, rows_num, arena);

let agg_states = (0..agg_len)
.map(|i| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<Method: HashMethodBounds> TransformFinalAggregate<Method> {

fn transform_agg_hashtable(&mut self, meta: AggregateMeta<Method, usize>) -> Result<DataBlock> {
let mut agg_hashtable: Option<AggregateHashTable> = None;
if let AggregateMeta::Partitioned { bucket: _, data } = meta {
if let AggregateMeta::Partitioned { bucket, data } = meta {
for bucket_data in data {
match bucket_data {
AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() {
Expand All @@ -92,16 +92,24 @@ impl<Method: HashMethodBounds> TransformFinalAggregate<Method> {
},
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
debug_assert!(bucket == payload.bucket);
let arena = Arc::new(Bump::new());
let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
0,
arena,
)?;
ht.combine_payloads(&payload, &mut self.flush_state)?;
}
None => {
debug_assert!(bucket == payload.bucket);
let arena = Arc::new(Bump::new());
let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
0,
arena,
)?;
let capacity =
AggregateHashTable::get_capacity_for_count(payload.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::HashTableConfig;
use databend_common_expression::PayloadFlushState;
use databend_common_expression::ProbeState;
use databend_common_functions::aggregates::StateAddr;
use databend_common_functions::aggregates::StateAddrs;
Expand All @@ -50,7 +51,6 @@ use crate::pipelines::processors::transforms::group_by::HashMethodBounds;
use crate::pipelines::processors::transforms::group_by::PartitionedHashMethod;
use crate::pipelines::processors::transforms::group_by::PolymorphicKeysHelper;
use crate::sessions::QueryContext;

#[allow(clippy::enum_variant_names)]
enum HashTable<Method: HashMethodBounds> {
MovedOut,
Expand Down Expand Up @@ -401,16 +401,23 @@ impl<Method: HashMethodBounds> AccumulatingTransform for TransformPartialAggrega

let group_types = v.payload.group_types.clone();
let aggrs = v.payload.aggrs.clone();
let config = v.config.clone();
let config = v.config.update_current_max_radix_bits();
let max_radix_bits = config.max_radix_bits;
let mut state = PayloadFlushState::default();

// repartition to max for normalization
let partitioned_payload =
v.payload.repartition(1 << max_radix_bits, &mut state);

let blocks = vec![DataBlock::empty_with_meta(
AggregateMeta::<Method, usize>::create_agg_spilling(v.payload),
AggregateMeta::<Method, usize>::create_agg_spilling(partitioned_payload),
)];

let arena = Arc::new(Bump::new());
self.hash_table = HashTable::AggregateHashTable(AggregateHashTable::new(
group_types,
aggrs,
config,
config.with_initial_radix_bits(max_radix_bits),
arena,
));
return Ok(blocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<Method: HashMethodBounds> TransformFinalGroupBy<Method> {

fn transform_agg_hashtable(&mut self, meta: AggregateMeta<Method, ()>) -> Result<DataBlock> {
let mut agg_hashtable: Option<AggregateHashTable> = None;
if let AggregateMeta::Partitioned { bucket: _, data } = meta {
if let AggregateMeta::Partitioned { bucket, data } = meta {
for bucket_data in data {
match bucket_data {
AggregateMeta::AggregateHashTable(payload) => match agg_hashtable.as_mut() {
Expand All @@ -85,16 +85,24 @@ impl<Method: HashMethodBounds> TransformFinalGroupBy<Method> {
},
AggregateMeta::Serialized(payload) => match agg_hashtable.as_mut() {
Some(ht) => {
debug_assert!(bucket == payload.bucket);
let arena = Arc::new(Bump::new());
let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
0,
arena,
)?;
ht.combine_payloads(&payload, &mut self.flush_state)?;
}
None => {
debug_assert!(bucket == payload.bucket);
let arena = Arc::new(Bump::new());
let payload = payload.convert_to_partitioned_payload(
self.params.group_data_types.clone(),
self.params.aggregate_functions.clone(),
0,
arena,
)?;
let capacity =
AggregateHashTable::get_capacity_for_count(payload.len());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use databend_common_expression::AggregateHashTable;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::HashTableConfig;
use databend_common_expression::PayloadFlushState;
use databend_common_expression::ProbeState;
use databend_common_hashtable::HashtableLike;
use databend_common_metrics::transform::*;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
Expand Down Expand Up @@ -212,6 +214,15 @@ impl<Method: HashMethodBounds> AccumulatingTransform for TransformPartialGroupBy
{
if let HashTable::PartitionedHashTable(v) = std::mem::take(&mut self.hash_table)
{
// perf
{
metrics_inc_group_by_partial_spill_count();
metrics_inc_group_by_partial_spill_cell_count(1);
metrics_inc_group_by_partial_hashtable_allocated_bytes(
v.allocated_bytes() as u64,
);
}

let _dropper = v._dropper.clone();
let blocks = vec![DataBlock::empty_with_meta(
AggregateMeta::<Method, ()>::create_spilling(v),
Expand All @@ -234,18 +245,33 @@ impl<Method: HashMethodBounds> AccumulatingTransform for TransformPartialGroupBy
|| GLOBAL_MEM_STAT.get_memory_usage() as usize >= self.settings.max_memory_usage)
{
if let HashTable::AggregateHashTable(v) = std::mem::take(&mut self.hash_table) {
// perf
{
metrics_inc_group_by_partial_spill_count();
metrics_inc_group_by_partial_spill_cell_count(1);
metrics_inc_group_by_partial_hashtable_allocated_bytes(
v.allocated_bytes() as u64,
);
}

let group_types = v.payload.group_types.clone();
let aggrs = v.payload.aggrs.clone();
let config = v.config.clone();
let config = v.config.update_current_max_radix_bits();
let max_radix_bits = config.max_radix_bits;
let mut state = PayloadFlushState::default();

// repartition to max for normalization
let partitioned_payload =
v.payload.repartition(1 << max_radix_bits, &mut state);
let blocks = vec![DataBlock::empty_with_meta(
AggregateMeta::<Method, ()>::create_agg_spilling(v.payload),
AggregateMeta::<Method, ()>::create_agg_spilling(partitioned_payload),
)];

let arena = Arc::new(Bump::new());
self.hash_table = HashTable::AggregateHashTable(AggregateHashTable::new(
group_types,
aggrs,
config,
config.with_initial_radix_bits(max_radix_bits),
arena,
));
return Ok(blocks);
Expand Down
Loading
Loading