Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: Fusing repart and partial aggr #12526

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e9792e2
define PartitionedGroupValues trait.
Rachelint Sep 14, 2024
25700eb
define `PartitionedGroupValues` and impl `PartitionedGroupValuesRows`…
Rachelint Sep 16, 2024
d00e46b
introduce `PartitionedGroupValues` to main path, and fix compile.
Rachelint Sep 16, 2024
855d454
impl group_aggregate_batch_single serving for single hashtable.
Rachelint Sep 17, 2024
bae4f7f
tmp
Rachelint Sep 17, 2024
4f4441c
impl `group_aggregate_batch_partitioned`.
Rachelint Sep 18, 2024
f24f3c3
tmp
Rachelint Sep 18, 2024
bec7a3a
impl `emit_partitioned`.
Rachelint Sep 18, 2024
553c6a3
introduce `ExecutionState::ProducingPartitionedOutput` to process the…
Rachelint Sep 18, 2024
4108065
introduce `PartitionedOutput`.
Rachelint Sep 19, 2024
05f4a49
use `PartitionedOutput`.
Rachelint Sep 19, 2024
39272a3
fix panic.
Rachelint Sep 19, 2024
0268404
fix aggr fuzz test.
Rachelint Sep 19, 2024
bf83235
print partitioned.
Rachelint Sep 19, 2024
5b68f1c
debug.
Rachelint Sep 19, 2024
45c9f86
fix.
Rachelint Sep 19, 2024
2fbcffc
add debug log to inspect.
Rachelint Sep 19, 2024
a026b30
remove some dbg!.
Rachelint Sep 19, 2024
0d95981
check num partitions.
Rachelint Sep 19, 2024
c91bc83
fix the hash seed in rows.
Rachelint Sep 19, 2024
abc59f6
debug finished.
Rachelint Sep 19, 2024
04756fa
refactor `PartitionedGroupValuesRows`.
Rachelint Sep 19, 2024
33beb1c
fix group indices.
Rachelint Sep 19, 2024
ed8c4aa
modify random state init.
Rachelint Sep 19, 2024
540588d
add partition info into schenma metadata.
Rachelint Sep 19, 2024
212641a
add fast partition in repart.
Rachelint Sep 19, 2024
9dee51f
debug finish.
Rachelint Sep 19, 2024
0e1c33b
use the slice approach to convert group value rows.
Rachelint Sep 19, 2024
cef63da
fix compile.
Rachelint Sep 19, 2024
b69dfd2
fix partition num.
Rachelint Sep 19, 2024
cef6305
revert to old PartitionedGroupValuesRows.
Rachelint Sep 19, 2024
96e7e4b
revert the random state.
Rachelint Sep 19, 2024
e9fe746
tmp
Rachelint Sep 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,10 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
assert!(collected_running.len() > 2);
// Running should produce more chunk than the usual AggregateExec.
// Otherwise it means that we cannot generate result in running mode.
assert!(collected_running.len() > collected_usual.len());

// TODO: temporarily remote this assert
// assert!(collected_running.len() > collected_usual.len());

// compare
let usual_formatted = pretty_format_batches(&collected_usual).unwrap().to_string();
let running_formatted = pretty_format_batches(&collected_running)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ impl<T> VecAllocExt for Vec<T> {
}
}

fn get_filter_at_indices(
pub fn get_filter_at_indices(
opt_filter: Option<&BooleanArray>,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<Option<ArrayRef>> {
Expand Down
127 changes: 126 additions & 1 deletion datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,114 @@ mod bytes_view;
use bytes::GroupValuesByes;
use datafusion_physical_expr::binary_map::OutputType;

use crate::aggregates::group_values::row::PartitionedGroupValuesRows;

pub enum GroupValuesLike {
Single(Box<dyn GroupValues>),
Partitioned(Box<dyn PartitionedGroupValues>),
}

impl GroupValuesLike {
#[inline]
pub fn is_partitioned(&self) -> bool {
matches!(&self, GroupValuesLike::Partitioned(_))
}

#[inline]
pub fn num_partitions(&self) -> usize {
if let Self::Partitioned(group_values) = self {
group_values.num_partitions()
} else {
1
}
}

#[inline]
pub fn as_single(&self) -> &Box<dyn GroupValues> {
match self {
GroupValuesLike::Single(v) => v,
GroupValuesLike::Partitioned(_) => unreachable!(),
}
}

#[inline]
pub fn as_partitioned(&self) -> &Box<dyn PartitionedGroupValues> {
match self {
GroupValuesLike::Partitioned(v) => v,
GroupValuesLike::Single(_) => unreachable!(),
}
}

#[inline]
pub fn as_single_mut(&mut self) -> &mut Box<dyn GroupValues> {
match self {
GroupValuesLike::Single(v) => v,
GroupValuesLike::Partitioned(_) => unreachable!(),
}
}

#[inline]
pub fn as_partitioned_mut(&mut self) -> &mut Box<dyn PartitionedGroupValues> {
match self {
GroupValuesLike::Partitioned(v) => v,
GroupValuesLike::Single(_) => unreachable!(),
}
}

#[inline]
pub fn len(&self) -> usize {
match self {
GroupValuesLike::Single(v) => v.len(),
GroupValuesLike::Partitioned(v) => v.len(),
}
}

#[inline]
pub fn size(&self) -> usize {
match self {
GroupValuesLike::Single(v) => v.size(),
GroupValuesLike::Partitioned(v) => v.size(),
}
}

#[inline]
pub fn is_empty(&self) -> bool {
match self {
GroupValuesLike::Single(v) => v.is_empty(),
GroupValuesLike::Partitioned(v) => v.is_empty(),
}
}
}

pub trait PartitionedGroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
fn intern(
&mut self,
cols: &[ArrayRef],
part_groups: &mut Vec<Vec<usize>>,
part_row_indices: &mut Vec<Vec<u32>>,
) -> Result<()>;

fn num_partitions(&self) -> usize;

/// Returns the number of bytes used by this [`GroupValues`]
fn size(&self) -> usize;

/// Returns true if this [`GroupValues`] is empty
fn is_empty(&self) -> bool;

fn partition_len(&self, partition_index: usize) -> usize;

/// The number of values stored in this [`GroupValues`]
fn len(&self) -> usize;

/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<Vec<ArrayRef>>>;

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear(&mut self);
}

/// An interning store for group keys
pub trait GroupValues: Send {
/// Calculates the `groups` for each input row of `cols`
Expand All @@ -54,7 +162,24 @@ pub trait GroupValues: Send {
fn clear_shrink(&mut self, batch: &RecordBatch);
}

pub fn new_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
pub fn new_group_values(
schema: SchemaRef,
partitioning_group_values: bool,
num_partitions: usize,
) -> Result<GroupValuesLike> {
let group_values = if partitioning_group_values && schema.fields.len() > 1 {
GroupValuesLike::Partitioned(Box::new(PartitionedGroupValuesRows::try_new(
schema,
num_partitions,
)?))
} else {
GroupValuesLike::Single(new_single_group_values(schema)?)
};

Ok(group_values)
}

pub fn new_single_group_values(schema: SchemaRef) -> Result<Box<dyn GroupValues>> {
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();

Expand Down
Loading