Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement hash partitioned aggregation #320

Merged
merged 18 commits into from
May 16, 2021

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented May 11, 2021

Which issue does this PR close?

Closes #27

Rationale for this change

A more scalable hash aggregate that works well for group by expressions that have high cardinality int he output and allows to scale better with the number of cpu cores.
The algorithm changes the steps from:

partial hash aggregate -> merge (to 1 partition) -> full hash aggregate

to

partial hash aggregate -> repartition on group by expressions -> hash aggregate (on partitions)

This is the same as what Spark is doing.

This mostly has an effect on group by with higher cardinality, but no substantial effect on lower cardinality (as partial result is already small).
For Ballista this would also be required I think @andygrove - currently every partition is merged into one which can be problematic (and slow).

For example, TPC-H query 3 benefits quite a bit:
Master:

Query 3 avg time: 71.82 ms

PR:

Query 3 avg time: 49.20 ms

The db-benchmark group by queries improve by quite a bit:

Master:

q1 took 34 ms
q2 took 454 ms
q3 took 3284 ms
q4 took 42 ms
q5 took 2924 ms
q7 took 2843 ms

PR

q1 took 33 ms
q2 took 369 ms
q3 took 1875 ms
q4 took 46 ms
q5 took 1756 ms
q7 took 1686 ms

What changes are included in this PR?

Are there any user-facing changes?

@Dandandan Dandandan changed the title WIP: Implement hash partitioned aggregation Implement hash partitioned aggregation May 11, 2021
@codecov-commenter
Copy link

codecov-commenter commented May 11, 2021

Codecov Report

Merging #320 (2fc12eb) into master (1702d6c) will decrease coverage by 0.00%.
The diff coverage is 76.47%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     apache/arrow-datafusion#320      +/-   ##
==========================================
- Coverage   75.72%   75.71%   -0.01%     
==========================================
  Files         143      143              
  Lines       23832    23881      +49     
==========================================
+ Hits        18046    18081      +35     
- Misses       5786     5800      +14     
Impacted Files Coverage Δ
...ta/rust/core/src/serde/physical_plan/from_proto.rs 47.39% <0.00%> (-0.42%) ⬇️
...ista/rust/core/src/serde/physical_plan/to_proto.rs 50.62% <0.00%> (-0.32%) ⬇️
ballista/rust/core/src/utils.rs 30.43% <ø> (ø)
datafusion/src/physical_plan/hash_join.rs 85.57% <0.00%> (-0.83%) ⬇️
datafusion/src/physical_plan/mod.rs 84.70% <ø> (ø)
...atafusion/src/physical_plan/unicode_expressions.rs 90.37% <ø> (ø)
datafusion/tests/sql.rs 99.88% <ø> (ø)
datafusion/src/physical_plan/planner.rs 80.62% <86.36%> (+0.62%) ⬆️
ballista/rust/scheduler/src/planner.rs 69.46% <100.00%> (ø)
ballista/rust/scheduler/src/test_utils.rs 100.00% <100.00%> (ø)
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1702d6c...2fc12eb. Read the comment docs.

@Dandandan Dandandan requested review from alamb and andygrove May 11, 2021 23:28
@Dandandan Dandandan requested a review from andygrove May 12, 2021 05:39
input_schema,
)?))
// TODO: dictionary type not yet supported in Hash Repartition
let contains_dict = groups
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create an issue for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really cool @Dandandan. I suggest you add a test, if possible, that shows the plan with the repartition exec operation in order to prevent someone accidentally turning off this optimization during a refactor

datafusion/src/execution/context.rs Show resolved Hide resolved
input_schema,
)?))
// TODO: dictionary type not yet supported in Hash Repartition
let contains_dict = groups
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you

datafusion/src/physical_plan/hash_aggregate.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/hash_aggregate.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/hash_join.rs Show resolved Hide resolved
@Dandandan
Copy link
Contributor Author

This looks really cool @Dandandan. I suggest you add a test, if possible, that shows the plan with the repartition exec operation in order to prevent someone accidentally turning off this optimization during a refactor

Good idea! Added a test for this

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is looking great. Thanks @Dandandan

@@ -202,6 +209,9 @@ impl ExecutionPlan for HashAggregateExec {
fn required_child_distribution(&self) -> Distribution {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
self.group_expr.iter().map(|x| x.0.clone()).collect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through this and it looks great. Thanks a lot @Dandandan , great work. 💯

:shipit: from my side.

We may want to give some time in case someone else would like to go through this before merging.

❤️

@Dandandan
Copy link
Contributor Author

@andygrove maybe? :)

@andygrove
Copy link
Member

@andygrove maybe? :)

I will make time this weekend to review this and take it for a spin!

@Dandandan
Copy link
Contributor Author

Awesome, thanks @andygrove !

I have also some nice followup this weekend for more performance improvements for hash aggregates :D

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I tested this out locally and confirmed that performance is much better for unpartitioned data, and about the same for partitioned data.

@andygrove
Copy link
Member

@Dandandan Looks like there is a conflict that needs fixing

@andygrove
Copy link
Member

I filed apache/datafusion-ballista#23 for implementing this optimization in Ballista

@Dandandan
Copy link
Contributor Author

Dandandan commented May 15, 2021

Somehow coverage run seems to fail... But doesn't show what is failing

@alamb
Copy link
Contributor

alamb commented May 16, 2021

The https://github.com/apache/arrow-datafusion/pull/320/checks?check_run_id=2591763548 shows this buried in the logs (not at the end, annoyingly):

failures:

---- physical_plan::planner::tests::hash_agg_group_by_partitioned stdout ----
thread 'physical_plan::planner::tests::hash_agg_group_by_partitioned' panicked at 'assertion failed: formatted.contains(\"FinalPartitioned\")', datafusion/src/physical_plan/planner.rs:1051:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    physical_plan::planner::tests::hash_agg_group_by_partitioned

@Dandandan
Copy link
Contributor Author

thanks @alamb will merge it now when it's green

@Dandandan Dandandan merged commit ed92673 into apache:master May 16, 2021
@Dandandan
Copy link
Contributor Author

Thanks all 🎉

@houqp houqp added datafusion Changes in the datafusion crate enhancement New feature or request performance Make DataFusion faster labels Jul 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement hash-partitioned hash aggregate
6 participants