Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Demonstrate new GroupHashAggregate stream approach (runs more than 2x faster!) #6800

Closed
wants to merge 98 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
9b22745
POC: Demonstrate new GroupHashAggregate stream approach
alamb Jun 29, 2023
4ce6671
complete accumulator
alamb Jun 30, 2023
5694190
touchups
alamb Jun 30, 2023
a58b006
Add comments
alamb Jul 1, 2023
73cb33f
Update comments and simplify code
alamb Jul 1, 2023
0b5d74f
factor out accumulate
alamb Jul 1, 2023
c30874d
split nullable/non nullable handling
alamb Jul 1, 2023
2370220
Refactor out accumulation in average
alamb Jul 1, 2023
26570f9
Move accumulator to their own function
alamb Jul 1, 2023
bed990e
update more comments
alamb Jul 1, 2023
25787a0
Begin writing tests for accumulate
alamb Jul 1, 2023
8433d6f
more tets
alamb Jul 1, 2023
7e9b92e
more tests
alamb Jul 1, 2023
bb37e77
comments
alamb Jul 1, 2023
add7b36
Implement fuzz testing
alamb Jul 1, 2023
53aa18b
Clarify the required order from GroupsAccumulator
alamb Jul 2, 2023
00aac24
Zero copy into array
alamb Jul 2, 2023
d760a5f
fix spelling of indices
alamb Jul 2, 2023
8811fa6
implement filtering for easy path
alamb Jul 2, 2023
93a4e6f
Implement filtering
alamb Jul 2, 2023
966d3d0
Add null handling in avg
alamb Jul 2, 2023
316c781
WIP count
Jul 3, 2023
754a9ff
WIP count
Jul 3, 2023
e708723
Sketch out the adapter interface
alamb Jul 3, 2023
677160e
More new adapter interface
alamb Jul 3, 2023
689e51b
WIP sum
Jul 3, 2023
7b20155
WIP sum
Jul 3, 2023
6275a9f
Use `Rows` API
Jul 3, 2023
8902c91
Update adapter
alamb Jul 3, 2023
6cab205
Add docs, refactor
alamb Jul 3, 2023
587dc0e
Merge branch 'alamb/hash_agg_spike' of github.com:alamb/arrow-datafus…
alamb Jul 3, 2023
7683350
Merge remote-tracking branch 'origin/main' into hash_agg_spike2
Jul 3, 2023
52c62ec
Merge
Jul 3, 2023
1684916
WIP count
Jul 3, 2023
a94c346
WIP count
Jul 3, 2023
1ba625a
WIP count
Jul 3, 2023
c2f955d
WIP count
Jul 3, 2023
9ff91cb
Support sum
Jul 4, 2023
180903b
Complete adapter
alamb Jul 4, 2023
5d8bb35
Instantiate all types
alamb Jul 4, 2023
51b0243
Implement memory accounting
alamb Jul 4, 2023
68f62d1
cleanup memory accounting
alamb Jul 4, 2023
ad6d4f3
Fix sum accumulator with filtering, consolidate null handling
alamb Jul 4, 2023
87b54c9
Add float support for sum
Jul 5, 2023
eb919a9
Merge remote-tracking branch 'origin/main' into hash_agg_spike2
Jul 5, 2023
917c050
Simplify count aggregate, clean up aggregates cleanup, fuzz almost pa…
alamb Jul 5, 2023
9eb6822
Merge branch 'alamb/hash_agg_spike' of github.com:alamb/arrow-datafus…
alamb Jul 5, 2023
c041ecc
fix fmt
alamb Jul 5, 2023
f973a65
Fix clippy
alamb Jul 5, 2023
24abb14
Fix docs
alamb Jul 5, 2023
6e740a4
Min/Max for primitives
Jul 5, 2023
9d2c7bf
Min/Max for primitives
Jul 5, 2023
ecc980d
Min/Max initialization
Jul 5, 2023
fede032
Min/Max initialization
Jul 5, 2023
5076245
Initial min/max support for primitive
Jul 5, 2023
8de4ada
Refactor
Jul 5, 2023
09b9329
Clippy
Jul 5, 2023
ea0ce25
Clippy
Jul 5, 2023
be8a1e2
Cleanup
Jul 5, 2023
890b517
Fmt
Jul 5, 2023
ffd5cbe
Merge remote-tracking branch 'origin/main' into hash_agg_spike2
Jul 5, 2023
6846970
Speed up avg
Jul 5, 2023
2f4907a
Fmt
Jul 5, 2023
7ecf148
Add clickbench queries to sqllogictest coverage (#6836)
alamb Jul 5, 2023
9adcf97
feat: implement posgres style `encode`/`decode` (#6821)
ozgrakkurt Jul 5, 2023
4aa1656
chore(deps): update rstest requirement from 0.17.0 to 0.18.0 (#6847)
dependabot[bot] Jul 5, 2023
c02d4e4
[minior] support serde for some function (#6846)
liukun4515 Jul 5, 2023
e044b5c
Support fixed_size_list for make_array (#6759)
jayzhan211 Jul 5, 2023
e8d5c17
Improve median performance. (#6837)
vincev Jul 5, 2023
cf72ea0
Mismatch in MemTable of Select Into when projecting on aggregate wind…
berkaysynnada Jul 5, 2023
aab9103
feat: column support for `array_append`, `array_prepend`, `array_posi…
izveigor Jul 5, 2023
0fb5de7
MINOR: Fix ordering of the aggregate_source_with_order table (#6852)
mustafasrepo Jul 6, 2023
5705b3a
Return error when internal multiplication overflowing in decimal divi…
viirya Jul 6, 2023
e324e9f
Deprecate ScalarValue::and, ScalarValue::or (#6842) (#6844)
tustvold Jul 6, 2023
dec1b97
chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.0 (#6848)
dependabot[bot] Jul 6, 2023
49fc6c1
Update tests, and fix memory accounting
alamb Jul 6, 2023
b326b68
Merge branch 'alamb/hash_agg_spike' of github.com:alamb/arrow-datafus…
alamb Jul 6, 2023
b137df6
fix doc comments
alamb Jul 6, 2023
1d3185c
add ticket referece
alamb Jul 6, 2023
d9cca24
Only make code for average types that can be instantiated
alamb Jul 7, 2023
c68c39b
Improve aggregate_fuzz output
alamb Jul 7, 2023
0127917
Fix fuzz tests by emulating retractable batch
alamb Jul 7, 2023
e36a972
Fix and simplify min/max
alamb Jul 7, 2023
a96c3a0
Merge remote-tracking branch 'apache/main' into alamb/hash_agg_spike
alamb Jul 7, 2023
b6bde8d
Improve memory accounting
alamb Jul 7, 2023
cb5b8cb
feat: Add graphviz display format for execution plan. (#6726)
liurenjie1024 Jul 7, 2023
07f8d77
Fix (another) logical conflict (#6882)
alamb Jul 7, 2023
4dcac2a
Implement groups accumulators for bit operations
alamb Jul 7, 2023
5d6f815
Almost there
alamb Jul 7, 2023
60ee2ef
it compiles
alamb Jul 7, 2023
f2fc450
Reuse hashes buffer
Jul 8, 2023
b781910
Complete BoolAnd and BoolOr accumulators
alamb Jul 8, 2023
aebe77f
Fix doc
alamb Jul 8, 2023
7c17638
Merge remote-tracking branch 'apache/main' into alamb/hash_agg_spike
alamb Jul 8, 2023
0a5a749
Merge branch 'alamb/hash_agg_spike' of github.com:alamb/arrow-datafus…
alamb Jul 8, 2023
f684ae8
clippy
alamb Jul 8, 2023
e798074
Performance: Use a specialized sum accumulator for retractable aggreg…
alamb Jul 8, 2023
afcab34
Simplify sum and make it faster
alamb Jul 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::sync::Arc;
mod bounded_aggregate_stream;
mod no_grouping;
mod row_hash;
mod row_hash2;
mod utils;

pub use datafusion_expr::AggregateFunction;
Expand All @@ -58,6 +59,7 @@ use datafusion_physical_expr::utils::{
get_finer_ordering, ordering_satisfy_requirement_concrete,
};

use self::row_hash2::GroupedHashAggregateStream2;
use super::DisplayAs;

/// Hash aggregate modes
Expand Down Expand Up @@ -198,6 +200,7 @@ impl PartialEq for PhysicalGroupBy {
enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStream(GroupedHashAggregateStream),
GroupedHashAggregateStream2(GroupedHashAggregateStream2),
BoundedAggregate(BoundedAggregateStream),
}

Expand All @@ -206,6 +209,7 @@ impl From<StreamType> for SendableRecordBatchStream {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
StreamType::GroupedHashAggregateStream2(stream) => Box::pin(stream),
StreamType::BoundedAggregate(stream) => Box::pin(stream),
}
}
Expand Down Expand Up @@ -713,12 +717,23 @@ impl AggregateExec {
partition,
aggregation_ordering,
)?))
} else if self.use_poc_group_by() {
Ok(StreamType::GroupedHashAggregateStream2(
GroupedHashAggregateStream2::new(self, context, partition)?,
))
} else {
Ok(StreamType::GroupedHashAggregateStream(
GroupedHashAggregateStream::new(self, context, partition)?,
))
}
}

/// Returns true if we should use the POC group by stream
/// TODO: check for actually supported aggregates, etc
fn use_poc_group_by(&self) -> bool {
//info!("AAL Checking POC group by: {self:#?}");
true
}
}

impl DisplayAs for AggregateExec {
Expand Down Expand Up @@ -984,7 +999,7 @@ fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
Arc::new(Schema::new(group_fields))
}

/// returns physical expressions to evaluate against a batch
/// returns physical expressions for arguments to evaluate against a batch
/// The expressions are different depending on `mode`:
/// * Partial: AggregateExpr::expressions
/// * Final: columns of `AggregateExpr::state_fields()`
Expand Down Expand Up @@ -1787,10 +1802,10 @@ mod tests {
assert!(matches!(stream, StreamType::AggregateStream(_)));
}
1 => {
assert!(matches!(stream, StreamType::GroupedHashAggregateStream(_)));
assert!(matches!(stream, StreamType::GroupedHashAggregateStream2(_)));
}
2 => {
assert!(matches!(stream, StreamType::GroupedHashAggregateStream(_)));
assert!(matches!(stream, StreamType::GroupedHashAggregateStream2(_)));
}
_ => panic!("Unknown version: {version}"),
}
Expand Down
17 changes: 12 additions & 5 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Hash aggregation through row format

use log::info;
use std::cmp::min;
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -110,6 +111,8 @@ pub(crate) struct GroupedHashAggregateStream {
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
// buffer to be reused to store hashes
hashes_buffer: Vec<u64>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ this is a good change -- thanks @Dandandan . Pretty soon there will be no allocations while processing each batch (aka the hot loop) 🥳 -- I think with #6888 we can get rid of the counts in the sum accumulator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this change was made to the existing row_hash (not the new one). I will port the change to the new one as part of #6904

}

impl GroupedHashAggregateStream {
Expand All @@ -119,6 +122,7 @@ impl GroupedHashAggregateStream {
context: Arc<TaskContext>,
partition: usize,
) -> Result<Self> {
info!("Creating GroupedHashAggregateStream");
let agg_schema = Arc::clone(&agg.schema);
let agg_group_by = agg.group_by.clone();
let agg_filter_expr = agg.filter_expr.clone();
Expand Down Expand Up @@ -229,6 +233,7 @@ impl GroupedHashAggregateStream {
scalar_update_factor,
row_group_skip_position: 0,
indices: [normal_agg_indices, row_agg_indices],
hashes_buffer: vec![],
})
}
}
Expand Down Expand Up @@ -322,15 +327,17 @@ impl GroupedHashAggregateStream {
let mut groups_with_rows = vec![];

// 1.1 Calculate the group keys for the group values
let mut batch_hashes = vec![0; n_rows];
create_hashes(group_values, &self.random_state, &mut batch_hashes)?;
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(n_rows, 0);
create_hashes(group_values, &self.random_state, batch_hashes)?;

let AggregationState {
map, group_states, ..
} = &mut self.aggr_state;

for (row, hash) in batch_hashes.into_iter().enumerate() {
let entry = map.get_mut(hash, |(_hash, group_idx)| {
for (row, hash) in batch_hashes.iter_mut().enumerate() {
let entry = map.get_mut(*hash, |(_hash, group_idx)| {
// verify that a group that we are inserting with hash is
// actually the same key value as the group in
// existing_idx (aka group_values @ row)
Expand Down Expand Up @@ -385,7 +392,7 @@ impl GroupedHashAggregateStream {

// for hasher function, use precomputed hash value
map.insert_accounted(
(hash, group_idx),
(*hash, group_idx),
|(hash, _group_index)| *hash,
allocated,
);
Expand Down
Loading