Skip to content

Commit

Permalink
chore(query): modify new_agg_hashtable payload transmission method on…
Browse files Browse the repository at this point in the history
… singleton and support new_agg_hashtable on cluster (databendlabs#14798)

* refactor new_agg_hashtable

* test

* test

* fmt

* test

* test

* test

* format explain

---------

Co-authored-by: jw <freejw@gmail.com>
  • Loading branch information
Freejww and jw authored Mar 5, 2024
1 parent ba6fb38 commit f04c367
Show file tree
Hide file tree
Showing 22 changed files with 675 additions and 177 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ rust_decimal = "1.26"
serde = { workspace = true }
serde_json = { workspace = true }
simdutf8 = "0.1.4"
strength_reduce = "0.2.4"
terminal_size = "0.2.6"
tonic = { workspace = true }
typetag = { workspace = true }
Expand Down
13 changes: 12 additions & 1 deletion src/query/expression/src/aggregate/aggregate_hashtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub type Entry = u64;

pub struct AggregateHashTable {
pub payload: PartitionedPayload,
// use for append rows directly during deserialize
pub direct_append: bool,
config: HashTableConfig,
current_radix_bits: u64,
entries: Vec<Entry>,
Expand Down Expand Up @@ -71,6 +73,7 @@ impl AggregateHashTable {
Self {
entries: vec![0u64; capacity],
count: 0,
direct_append: false,
current_radix_bits: config.initial_radix_bits,
payload: PartitionedPayload::new(group_types, aggrs, 1 << config.initial_radix_bits),
capacity,
Expand Down Expand Up @@ -134,7 +137,15 @@ impl AggregateHashTable {
state.row_count = row_count;
group_hash_columns(group_columns, &mut state.group_hashes);

let new_group_count = self.probe_and_create(state, group_columns, row_count);
let new_group_count = if self.direct_append {
for idx in 0..row_count {
state.empty_vector[idx] = idx;
}
self.payload.append_rows(state, row_count, group_columns);
row_count
} else {
self.probe_and_create(state, group_columns, row_count)
};

if !self.payload.aggrs.is_empty() {
for i in 0..row_count {
Expand Down
45 changes: 44 additions & 1 deletion src/query/expression/src/aggregate/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use bumpalo::Bump;
use databend_common_base::runtime::drop_guard;
use strength_reduce::StrengthReducedU64;

use super::payload_row::rowformat_size;
use super::payload_row::serialize_column_to_rowformat;
Expand All @@ -26,8 +27,10 @@ use crate::store;
use crate::types::DataType;
use crate::AggregateFunctionRef;
use crate::Column;
use crate::PayloadFlushState;
use crate::SelectVector;
use crate::StateAddr;
use crate::BATCH_SIZE;
use crate::MAX_PAGE_SIZE;

// payload layout
Expand All @@ -38,6 +41,7 @@ use crate::MAX_PAGE_SIZE;
// [STATE_ADDRS] is the state_addrs of the aggregate functions, 8 bytes each
pub struct Payload {
pub arena: Arc<Bump>,
pub arenas: Vec<Arc<Bump>>,
// if true, the states are moved out of the payload into other payload, and will not be dropped
pub state_move_out: bool,
pub group_types: Vec<DataType>,
Expand Down Expand Up @@ -120,7 +124,8 @@ impl Payload {
let row_per_page = (u16::MAX as usize).min(MAX_PAGE_SIZE / tuple_size).max(1);

Self {
arena,
arena: arena.clone(),
arenas: vec![arena],
state_move_out: false,
pages: vec![],
current_write_page: 0,
Expand Down Expand Up @@ -333,6 +338,44 @@ impl Payload {
self.pages.iter().map(|x| x.rows).sum::<usize>()
);
}

pub fn scatter(&self, state: &mut PayloadFlushState, partition_count: usize) -> bool {
if state.flush_page >= self.pages.len() {
return false;
}

let page = &self.pages[state.flush_page];

// ToNext
if state.flush_page_row >= page.rows {
state.flush_page += 1;
state.flush_page_row = 0;
state.row_count = 0;
return self.scatter(state, partition_count);
}

let end = (state.flush_page_row + BATCH_SIZE).min(page.rows);
let rows = end - state.flush_page_row;
state.row_count = rows;

state.probe_state.reset_partitions(partition_count);

let mods: StrengthReducedU64 = StrengthReducedU64::new(partition_count as u64);
for idx in 0..rows {
state.addresses[idx] = self.data_ptr(page, idx + state.flush_page_row);

let hash =
unsafe { core::ptr::read::<u64>(state.addresses[idx].add(self.hash_offset) as _) };

let partition_idx = (hash % mods) as usize;

let sel = &mut state.probe_state.partition_entries[partition_idx];
sel[state.probe_state.partition_count[partition_idx]] = idx;
state.probe_state.partition_count[partition_idx] += 1;
}
state.flush_page_row = end;
true
}
}

impl Drop for Payload {
Expand Down
6 changes: 2 additions & 4 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl PipelineBuilder {

let enable_experimental_aggregate_hashtable = self
.settings
.get_enable_experimental_aggregate_hashtable()?
&& self.ctx.get_cluster().is_empty();
.get_enable_experimental_aggregate_hashtable()?;

let params = Self::build_aggregator_params(
aggregate.input.output_schema()?,
Expand Down Expand Up @@ -213,8 +212,7 @@ impl PipelineBuilder {
let max_block_size = self.settings.get_max_block_size()?;
let enable_experimental_aggregate_hashtable = self
.settings
.get_enable_experimental_aggregate_hashtable()?
&& self.ctx.get_cluster().is_empty();
.get_enable_experimental_aggregate_hashtable()?;

let params = Self::build_aggregator_params(
aggregate.before_group_by_schema.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::DataBlock;
use databend_common_expression::Payload;
use databend_common_expression::PayloadFlushState;
use databend_common_hashtable::FastHash;
use databend_common_hashtable::HashtableEntryMutRefLike;
use databend_common_hashtable::HashtableEntryRefLike;
Expand Down Expand Up @@ -78,6 +80,7 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> ExchangeSorting
AggregateMeta::Serialized(v) => Ok(v.bucket),
AggregateMeta::HashTable(v) => Ok(v.bucket),
AggregateMeta::AggregateHashTable(_) => unreachable!(),
AggregateMeta::AggregatePayload(v) => Ok(v.bucket),
AggregateMeta::Spilled(_)
| AggregateMeta::Spilling(_)
| AggregateMeta::BucketSpilled(_) => Ok(-1),
Expand Down Expand Up @@ -139,6 +142,42 @@ fn scatter<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>(
Ok(res)
}

fn scatter_paylaod(mut payload: Payload, buckets: usize) -> Result<Vec<Payload>> {
let mut buckets = Vec::with_capacity(buckets);

let group_types = payload.group_types.clone();
let aggrs = payload.aggrs.clone();
let mut state = PayloadFlushState::default();

for _ in 0..buckets.capacity() {
buckets.push(Payload::new(
Arc::new(Bump::new()),
group_types.clone(),
aggrs.clone(),
));
}

for bucket in buckets.iter_mut() {
bucket.arenas.extend_from_slice(&payload.arenas);
}

// scatter each page of the payload.
while payload.scatter(&mut state, buckets.len()) {
// copy to the corresponding bucket.
for (idx, bucket) in buckets.iter_mut().enumerate() {
let count = state.probe_state.partition_count[idx];

if count > 0 {
let sel = &state.probe_state.partition_entries[idx];
bucket.copy_rows(sel, count, &state.addresses);
}
}
}
payload.state_move_out = true;

Ok(buckets)
}

impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
for HashTableHashScatter<Method, V>
{
Expand Down Expand Up @@ -176,8 +215,18 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static> FlightScatter
});
}
}

AggregateMeta::AggregateHashTable(_) => todo!("AGG_HASHTABLE"),
AggregateMeta::AggregateHashTable(_) => unreachable!(),
AggregateMeta::AggregatePayload(p) => {
for payload in scatter_paylaod(p.payload, self.buckets)? {
blocks.push(DataBlock::empty_with_meta(
AggregateMeta::<Method, V>::create_agg_payload(
p.bucket,
payload,
p.max_partition_count,
),
))
}
}
};

return Ok(blocks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_expression::BlockMetaInfoPtr;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::PartitionedPayload;
use databend_common_expression::Payload;

use crate::pipelines::processors::transforms::aggregator::HashTableCell;
use crate::pipelines::processors::transforms::group_by::HashMethodBounds;
Expand All @@ -34,6 +35,8 @@ pub struct HashTablePayload<T: HashMethodBounds, V: Send + Sync + 'static> {
pub struct SerializedPayload {
pub bucket: isize,
pub data_block: DataBlock,
// use for new agg_hashtable
pub max_partition_count: usize,
}

impl SerializedPayload {
Expand All @@ -50,10 +53,17 @@ pub struct BucketSpilledPayload {
pub columns_layout: Vec<u64>,
}

pub struct AggregatePayload {
pub bucket: isize,
pub payload: Payload,
pub max_partition_count: usize,
}

pub enum AggregateMeta<Method: HashMethodBounds, V: Send + Sync + 'static> {
Serialized(SerializedPayload),
HashTable(HashTablePayload<Method, V>),
AggregateHashTable(PartitionedPayload),
AggregatePayload(AggregatePayload),
BucketSpilled(BucketSpilledPayload),
Spilled(Vec<BucketSpilledPayload>),
Spilling(HashTablePayload<PartitionedHashMethod<Method>, V>),
Expand All @@ -73,10 +83,29 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> AggregateMeta<Method, V
Box::new(AggregateMeta::<Method, V>::AggregateHashTable(payload))
}

pub fn create_serialized(bucket: isize, block: DataBlock) -> BlockMetaInfoPtr {
pub fn create_agg_payload(
bucket: isize,
payload: Payload,
max_partition_count: usize,
) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::<Method, V>::AggregatePayload(
AggregatePayload {
bucket,
payload,
max_partition_count,
},
))
}

pub fn create_serialized(
bucket: isize,
block: DataBlock,
max_partition_count: usize,
) -> BlockMetaInfoPtr {
Box::new(AggregateMeta::<Method, V>::Serialized(SerializedPayload {
bucket,
data_block: block,
max_partition_count,
}))
}

Expand Down Expand Up @@ -136,6 +165,9 @@ impl<Method: HashMethodBounds, V: Send + Sync + 'static> Debug for AggregateMeta
AggregateMeta::AggregateHashTable(_) => {
f.debug_struct("AggregateMeta:AggHashTable").finish()
}
AggregateMeta::AggregatePayload(_) => {
f.debug_struct("AggregateMeta:AggregatePayload").finish()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ pub struct AggregateSerdeMeta {
pub location: Option<String>,
pub data_range: Option<Range<u64>>,
pub columns_layout: Vec<usize>,
// use for new agg_hashtable
pub is_agg_payload: bool,
pub max_partition_count: usize,
}

impl AggregateSerdeMeta {
Expand All @@ -39,6 +42,20 @@ impl AggregateSerdeMeta {
location: None,
data_range: None,
columns_layout: vec![],
is_agg_payload: false,
max_partition_count: 0,
})
}

pub fn create_agg_payload(bucket: isize, max_partition_count: usize) -> BlockMetaInfoPtr {
Box::new(AggregateSerdeMeta {
typ: BUCKET_TYPE,
bucket,
location: None,
data_range: None,
columns_layout: vec![],
is_agg_payload: true,
max_partition_count,
})
}

Expand All @@ -54,6 +71,8 @@ impl AggregateSerdeMeta {
columns_layout,
location: Some(location),
data_range: Some(data_range),
is_agg_payload: false,
max_partition_count: 0,
})
}
}
Expand Down
Loading

0 comments on commit f04c367

Please sign in to comment.