Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify Row hash and hash implementation #4924

Merged
merged 21 commits into from
Jan 19, 2023
Merged

Unify Row hash and hash implementation #4924

merged 21 commits into from
Jan 19, 2023

Conversation

mustafasrepo
Copy link
Contributor

@mustafasrepo mustafasrepo commented Jan 16, 2023

Which issue does this PR close?

Closes #2723

re #4973

Rationale for this change

Currently there are two very similar implementation for GroupedHashAggregators. Implementation changes according whether all aggregators support row format or not. We can decrease code duplication by combining implementations.

What changes are included in this PR?

This PR groups aggregators supporting row format and not supporting. It calculates aggregator results for corresponding aggregators. Then writes the result to the final place in the schema. This has several advantages. It decreases code duplication. It also can utilize row aggregation even if all aggregators do not support row aggregation. For instance, consider the query below

SELECT MIN(c13) as min1, MIN(c9) as min2, MAX(c13) as max1, MAX(c9) as max2
    FROM aggregate_test_100
    GROUP BY c1, c2

We can calculate result of MIN(c9), MAX(c9) with row accumulator, and MIN(c13), MAX(c13) with normal accumulator (where c9 is a primitive type that supports row accumulation, c13 is a complex type that doesn't support row accumulation). After generating results we can write the result to appropriate index to align with schema. MIN(c9), MAX(c9) will be written to indices 1 and 3, similarly MIN(c13), MAX(c13)will be written to indices 0 and 2 at the final record batch.

Are these changes tested?

They are covered by existing tests. Since this is basically a refactor.
I ran a benchmark against master branch. Result of the benchmark can be seen below

aggregate_query_no_group_by 15 12
                        time:   [602.10 µs 603.18 µs 604.31 µs]
                        change: [-7.4190% -5.3949% -3.6331%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 11 outliers among 100 measurements (11.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  7 (7.00%) high mild
  1 (1.00%) high severe

aggregate_query_no_group_by_min_max_f64
                        time:   [568.91 µs 569.86 µs 570.88 µs]
                        change: [+0.1641% +0.5456% +0.9153%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) high mild
  3 (3.00%) high severe

Benchmarking aggregate_query_no_group_by_count_distinct_wide: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.2s, enable flat sampling, or reduce sample count to 50.
aggregate_query_no_group_by_count_distinct_wide
                        time:   [1.6337 ms 1.6380 ms 1.6431 ms]
                        change: [-4.0379% -3.0041% -1.9941%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe

aggregate_query_no_group_by_count_distinct_narrow
                        time:   [897.64 µs 901.57 µs 907.76 µs]
                        change: [-11.367% -9.2388% -7.3108%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  1 (1.00%) high mild
  4 (4.00%) high severe

Benchmarking aggregate_query_group_by: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.0s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by
                        time:   [1.3854 ms 1.3883 ms 1.3914 ms]
                        change: [-8.6161% -4.4138% -1.7598%] (p = 0.01 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low severe
  1 (1.00%) low mild
  5 (5.00%) high mild
  1 (1.00%) high severe

aggregate_query_group_by_with_filter
                        time:   [927.42 µs 930.20 µs 933.17 µs]
                        change: [-0.7073% -0.2995% +0.1043%] (p = 0.17 > 0.05)
                        No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  4 (4.00%) high mild
  1 (1.00%) high severe

Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.1s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by_u64 15 12
                        time:   [1.3993 ms 1.4028 ms 1.4068 ms]
                        change: [-1.4347% -0.6720% +0.3535%] (p = 0.18 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) high mild
  6 (6.00%) high severe

aggregate_query_group_by_with_filter_u64 15 12
                        time:   [929.40 µs 933.94 µs 939.10 µs]
                        change: [-0.0263% +0.4356% +0.9373%] (p = 0.07 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) high mild
  3 (3.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [12.946 ms 13.002 ms 13.059 ms]
                        change: [+0.5494% +1.1347% +1.6967%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

aggregate_query_approx_percentile_cont_on_u64
                        time:   [4.2382 ms 4.2527 ms 4.2671 ms]
                        change: [+7.5876% +8.6055% +9.5317%] (p = 0.00 < 0.05)
                        Performance has regressed.

aggregate_query_approx_percentile_cont_on_f32
                        time:   [3.9627 ms 3.9816 ms 4.0010 ms]
                        change: [+5.7426% +6.4471% +7.1287%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Physical Expressions labels Jan 16, 2023
@ozankabak
Copy link
Contributor

ozankabak commented Jan 16, 2023

For some reason CI doesn't seem to be running, can you take a look @alamb?

Sending another commit seems to have triggered it.

@alamb
Copy link
Contributor

alamb commented Jan 17, 2023

Thank you @mustafasrepo - I will put this on my review queue for tomorrow. Sounds awesome

cc @crepererum and @tustvold

@crepererum
Copy link
Contributor

Can we have a benchmark run for this? Basically run cargo bench -p datafusion --bench aggregate_query_sql -- --save-baseline pr4924-pre before your first commit and cargo bench -p datafusion --bench aggregate_query_sql -- --baseline pr4924-pre after your last one and post the result of the latter.

datafusion/core/src/physical_plan/aggregates/row_hash.rs Outdated Show resolved Hide resolved
Comment on lines +93 to +94
/// Aggregate expressions not supporting row accumulation
normal_aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have one operator now but still two accumulator implementations? I think think this counts as "closing the ticket" though. We still have massive code duplication in accumulator implementations. Furthermore the implementation of this specific stream gets more complicated, but I agree that this would be one possible path forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, one operator but multiple accumulators. Having a single accumulator implementation without performance impact is a tough goal and there seems to be some open questions around it. For example, some accumulation tasks involve non-fixed states, how do we implement them in the row-accumulator style?

Until we see a clear path forward in that regard, I think this is a good first step towards full unification (if such a thing is possible).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.e. I'm personally convinced that the row accumulator style is a dead end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I obviously need some time to digest the V3 plan, but it looks reasonable at a first reading. We will be happy help chipping away at it over time if you create a task-level epic out of it. And this PR would be a good intermediate-term simplification until then.

In this sense, I agree that this PR doesn't close the ticket fully. @mustafasrepo, can you change PR text to say "Make progress on #2723" instead of saying closes? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try and help organize the tickets some more tomorrow morning. Thanks for this discussion

datafusion/core/src/physical_plan/aggregates/row_hash.rs Outdated Show resolved Hide resolved
@mustafasrepo
Copy link
Contributor Author

Can we have a benchmark run for this? Basically run cargo bench -p datafusion --bench aggregate_query_sql -- --save-baseline pr4924-pre before your first commit and cargo bench -p datafusion --bench aggregate_query_sql -- --baseline pr4924-pre after your last one and post the result of the latter.

I updated PR body to include benchmark result(against master branch). Thanks for pointing this out.

@crepererum
Copy link
Contributor

Can we have a benchmark run for this? Basically run cargo bench -p datafusion --bench aggregate_query_sql -- --save-baseline pr4924-pre before your first commit and cargo bench -p datafusion --bench aggregate_query_sql -- --baseline pr4924-pre after your last one and post the result of the latter.

I updated PR body to include benchmark result(against master branch). Thanks for pointing this out.

Looks good, mostly noise I guess (and even if we suffer a 10% hit, I would take that for the simpler implementation).

@alamb
Copy link
Contributor

alamb commented Jan 17, 2023

Looks good, mostly noise I guess (and even if we suffer a 10% hit, I would take that for the simpler implementation).

my reading of the benchmarks was that some got 10% faster as well. Am I reading it wrong?

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mustafasrepo and @ozankabak -- I got through some of this PR today but I was not able to finish the review and have run out of time. I will complete my review first thing tomorrow

@@ -151,15 +149,13 @@ impl PhysicalGroupBy {

enum StreamType {
AggregateStream(AggregateStream),
GroupedHashAggregateStreamV2(GroupedHashAggregateStreamV2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is so wonderful to see

@@ -202,7 +202,9 @@ impl RowAccumulator for CountRowAccumulator {
}

fn evaluate(&self, accessor: &RowAccessor) -> Result<ScalarValue> {
Ok(accessor.get_as_scalar(&DataType::Int64, self.state_index))
Ok(ScalarValue::Int64(Some(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change necessary or is it a drive by refactor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive-by mini bug fix if I am remembering correctly. I believe before this change, we were getting a NULL where we were supposed to get a zero in some use case. @mustafasrepo, am I remembering correctly? If so, we should add a mini-test for this.

Copy link
Contributor Author

@mustafasrepo mustafasrepo Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the group, where CountRowAccumulator is working is empty. Without this change it produces NULL, However, it should produce 0. I cannot produce an example for this case. Since row_accumulator only works when query contains GROUP BY, and GROUP BY cannot produce empty group. However, as an example

SELECT count(*) as cnt
FROM aggregate_test_100
WHERE 1 = 0;

will produce (where CountAccumulator is used)

"+-----+",
"| cnt |",
"+-----+",
"| 0   |",
"+-----+",

. However, if it were to work with CountRowAccumulator. It would produce

"+-----+",
"| cnt |",
"+-----+",
"|     |",
"+-----+",

which is wrong. This change fixes this bug.
In summary, there is no way to reproduce this error (As far as I know) in current implementation, since row_accumulator is used only when query contains GROUP BY clause. However, if we were to use row_accumulator in non-group cases we can encounter this issue (I recognized this behavior when experimenting RowAccumulator support for non-group by aggregation). By the way, below query would

SELECT count(*)
FROM aggregate_test_100
WHERE 1 = 0
GROUP BY ();

reproduce the issue. However, it is not supported in datafusion currently.

@ozankabak
Copy link
Contributor

Looks good, mostly noise I guess (and even if we suffer a 10% hit, I would take that for the simpler implementation).

my reading of the benchmarks was that some got 10% faster as well. Am I reading it wrong?

It got faster in some cases but a little slower in others. Overall, performance is similar I think.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through this carefully -- great work @mustafasrepo and @ozankabak

cc @yjshen and @richox who I think worked on the group row format
cc @Dandandan who has worked in this code area before

/// keeps range for each accumulator in the field
/// first element in the array corresponds to normal accumulators
/// second element in the array corresponds to row accumulators
indices: [Vec<Range<usize>>; 2],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The need to keep two lists of accumulators is quite unfortunate (maybe the code would be simpler if it were in a single Enum or behind a trait). However, I think this implementation is better than what we have on the master branch because it at least only has duplication with the aggregators rather than the entire GroupHash structure

@alamb
Copy link
Contributor

alamb commented Jan 18, 2023

During review of this code it was clear it was in need of some more cleanup -- PR proposing to do so in #4972

@alamb
Copy link
Contributor

alamb commented Jan 18, 2023

I changed the PR description to say "closes #2723" and filed #4973 to track the additional work to improve aggregators.

The implementation is getting quite nice :bowtie: I think

@alamb
Copy link
Contributor

alamb commented Jan 18, 2023

I will plan to merge this PR tomorrow unless there are any other comments

@ozankabak
Copy link
Contributor

#4973 looks good to me, we will help with migrating once the foundational tools are in place.

@alamb alamb merged commit 96cf046 into apache:master Jan 19, 2023
@alamb
Copy link
Contributor

alamb commented Jan 19, 2023

Thanks again -- this is going to be great!

@ursabot
Copy link

ursabot commented Jan 19, 2023

Benchmark runs are scheduled for baseline = e6a0500 and contender = 96cf046. 96cf046 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@mustafasrepo mustafasrepo deleted the feature/row_accumulator_utilize branch February 10, 2023 06:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Consolidate GroupByHash implementations row_hash.rs and hash.rs (remove duplication)
5 participants