Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of high cardinality grouping by reusing hash values #11680

Open
Tracked by #11679
alamb opened this issue Jul 26, 2024 · 24 comments · May be fixed by #11708
Open
Tracked by #11679

Improve performance of high cardinality grouping by reusing hash values #11680

alamb opened this issue Jul 26, 2024 · 24 comments · May be fixed by #11708
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 26, 2024

Is your feature request related to a problem or challenge?

As described on #11679, we can do better for high cardinality aggregates

One thing that consumes significant time in such queries is hashing, and I think we can reduce that significantly.

Specifically, for the multi-phase repartition plan, the number of hashed rows is something like

(input cardinality)  + 2 * (intermediate group cardinality) * (number of partitions)

For low cardinality aggregates (e.g when the intermediate group cardinality is 1000) the second term is small (a few thousand extra hashes isn't a big deal)

However, for high cardinality aggregates (eg. when the intermediate cardinality is like 1,000,000 and there are 16 partitions) the second term is substantial

In pictures, this looks like

               ▲                          ▲
               │                          │
               │                          │
               │                          │
               │                          │
               │                          │
   ┌───────────────────────┐  ┌───────────────────────┐       4. The  AggregateMode::Final
   │GroupBy                │  │GroupBy                │       GroupBy computes hash(group keys)
   │(AggregateMode::Final) │  │(AggregateMode::Final) │       *AGAIN* to find the correct hash
   │                       │  │                       │       bucket
   └───────────────────────┘  └───────────────────────┘
               ▲                          ▲
               │                          │
               └─────────────┬────────────┘
                             │
                             │
                             │
                ┌─────────────────────────┐                   3. The output of the first phase
                │       Repartition       │                   is repartitioned by computing
                │         HASH(x)         │                   hash(group keys) -- this is the
                └─────────────────────────┘                   same hash as computed in step 2.
                             ▲
                             │
             ┌───────────────┴─────────────┐
             │                             │
             │                             │
┌─────────────────────────┐  ┌──────────────────────────┐     2. Each AggregateMode::Partial
│        GroubyBy         │  │         GroubyBy         │     GroupBy hashes the group keys to
│(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     find the correct hash bucket.
└─────────────────────────┘  └──────────────────────────┘
             ▲                             ▲
             │                            ┌┘
             │                            │
        .─────────.                  .─────────.
     ,─'           '─.            ,─'           '─.
    ;      Input      :          ;      Input      :          1. Input is read
    :   Partition 0   ;          :   Partition 1   ;
     ╲               ╱            ╲               ╱
      '─.         ,─'              '─.         ,─'
         `───────'                    `───────'

This effect can be seen in profiling for ClickBench Q17:

SELECT "UserID", "SearchPhrase", COUNT(*) FROM "hits.parquet" GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10
$ datafusion-cli -c 'SELECT "UserID", "SearchPhrase", COUNT(*) FROM "hits.parquet" GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;'

Here is the profiling from Instruments:
Screenshot 2024-07-26 at 4 26 14 PM

Describe the solution you'd like

The basic idea is to avoid recompute the hash values in RepartitionExec and AggregateMode::Final by reuse the values from AggregateMode::Partial (which has already computed a hash value for each input group)

Something like this

                         ▲                          ▲                                                       
                         │                          │                                                       
                         │                          │                                                       
                         │                          │                                                       
                         │                          │                                                       
                         │                          │                                                       
             ┌───────────────────────┐  ┌───────────────────────┐       4. The  AggregateMode::Final        
             │GroupBy                │  │GroupBy                │       GroupBy also gets the hash values   
             │(AggregateMode::Final) │  │(AggregateMode::Final) │       and does not recompute them         
             │                       │  │                       │                                           
             └───────────────────────┘  └───────────────────────┘                                           
               ▲         ▲                          ▲                                                       
               │         │                          │                                                       
                         └─────────────┬────────────┘                                                       
Pass hash      │                       │                                                                    
values up the                          │                                                                    
plan rather    │                       │                                                                    
than                      ┌─────────────────────────┐                   3. In addition to the partial       
recomputing    │          │       Repartition       │                   aggregates and group values, *ALSO* 
them                      │    PRECOMPUTED_HASH     │                   pass the hash values to the         
               │          └─────────────────────────┘                   RepartitionExec which also passed   
                                       ▲                                them on to the AggregateMode::Final 
               │                       │                                                                    
                       ┌───────────────┴─────────────┐                                                      
               │       │                             │                                                      
                       │                             │                                                      
          ┌─────────────────────────┐  ┌──────────────────────────┐     2. Each AggregateMode::Partial      
          │        GroubyBy         │  │         GroubyBy         │     GroupBy hashes the group keys to    
          │(AggregateMode::Partial) │  │ (AggregateMode::Partial) │     find the correct hash bucket.       
          └─────────────────────────┘  └──────────────────────────┘                                         
                       ▲                             ▲                                                      
                       │                            ┌┘                                                      
                       │                            │                                                       
                  .─────────.                  .─────────.                                                  
               ,─'           '─.            ,─'           '─.                                               
              ;      Input      :          ;      Input      :          1. Input is read                    
              :   Partition 0   ;          :   Partition 1   ;                                              
               ╲               ╱            ╲               ╱                                               
                '─.         ,─'              '─.         ,─'                                                
                   `───────'                    `───────'                                                   

Describe alternatives you've considered

We maybe could pass the data as an explicit new column somehow, or maybe as a field in a struct array 🤔

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented Jul 26, 2024

The trick with this ticket will be to structure the code in a way that is general and works across plans.

It might first be worth a POC / hack to see how much performance there is to be had here (I suspect it is like 5-10% at most)

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jul 31, 2024

The experiment I did in #11708 shows that

  1. There is no much difference for clickbench Q17
  2. Outperform for high cardinality specialized test, row num 2,000,000 with all the value different
  3. Simplify Repartition Hash code largely

@alamb If the benchmark code looks good to you, I think we could reuse hash
To further improve clickbench Q17, the bottleneck is now arrow::Row (RowConverter::append, and Rows::push), do you think there is room for improvement? or should we find a way to reduce Rows by design

I had not cleanup the code yet for production ready, so the impact for other queries are unknown.

Alternative idea for improvement is, if we can combine partial group + repartition + final group in one operation. We could probably avoid converting to row once again in final group.

@alamb
Copy link
Contributor Author

alamb commented Jul 31, 2024

Thank you @jayzhan211 -- that is some interesting results.

I think it makes sense that reusing the hash values is helpful mostly for high cardinality aggregates as in that case the number of rows that need to be repartitioned /rehashed is high.

Alternative idea for improvement is, if we can combine partial group + repartition + final group in one operation. We could probably avoid converting to row once again in final group.

I think this is the approach taken by systems like DuckDB as I understand it and I think it is quite intregruing to consider

The challenge of the approach would be the software engineering required to manage the complexity of the combined multi-stage operator. I am not sure the functioanlity would be easy to combine without some more refactoring 🤔

@alamb
Copy link
Contributor Author

alamb commented Aug 1, 2024

@Dandandan has a good point on #11708 (comment) that is some cases (like a network shuffle) passing the hash values might be more expensive than just recomputing them

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 1, 2024

I got performance boost for clickbench Q17 just by enforcing single mode for multi column group by, interesting

Comparing main and single-multi-groupby
--------------------
Benchmark clickbench_1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ single-multi-groupby ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 02009.51ms │            1435.19ms │ +1.40x faster │
│ QQuery 17270.86ms │            4033.52ms │ +1.80x faster │
└──────────────┴───────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                   ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)9280.37ms │
│ Total Time (single-multi-groupby)5468.70ms │
│ Average Time (main)4640.19ms │
│ Average Time (single-multi-groupby)2734.35ms │
│ Queries Faster2 │
│ Queries Slower0 │
│ Queries with No Change0 │
└─────────────────────────────────────┴───────────┘

#11762

I think what I need to do is find a query that is currently slower in single mode, and find a way to optimize it like partial/final way in single execution node? 🤔

Does the result shows that what we really need is storing values in one large hash table?

Does anyone know what kind of query that is what partial/final group by good at?

Upd:

The specialized all distinct benchmark has a more crazy number (reuse_hash.rs)

// single-multi-groupby
Gnuplot not found, using plotters backend
benchmark               time:   [82.970 ms 98.723 ms 111.32 ms]
                        change: [-99.328% -98.932% -97.777%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 10 measurements (20.00%)
  2 (20.00%) high mild

// main (1ce546168)
Gnuplot not found, using plotters backend
Benchmarking benchmark: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 23.3s.
benchmark               time:   [660.82 ms 1.3354 s 2.1247 s]
                        change: [+675.04% +1377.8% +2344.1%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 10 measurements (10.00%)
  1 (10.00%) high mild

@alamb
Copy link
Contributor Author

alamb commented Aug 1, 2024

#11762

I think what I need to do is find a query that is currently slower in single mode, and find a way to optimize it like partial/final way in single execution node? 🤔

What exactly does single_mode do? #11762 looks like maybe it just uses a single group by node?

Does anyone know what kind of query that is what partial/final group by good at?

I think they are good at being able to use multiple to do the work in parallel

They are especially good at low cardinality aggregates (some of the TPCH ones for example where there are 4 distinct groups) as the hash tables are small and the final shuffle is very small.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 2, 2024

What exactly does single_mode do? #11762 looks like maybe it just uses a single group by node?

Yes, partial/final often do the hashing / arrow::Row conversion twice. In single group by node, there is only once.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Aug 2, 2024

Difference between #11762 and main

#11762 runs Repartition -> SingleMode group by
main runs Partial group by-> Repartition -> Final group by

For high cardinality (2M rows unique values)
#11762 repartitioned to 12, divided equally, each has around ~166666 rows to do single group by (same with final step)
main run partial aggregate first, there are 256 partition with ~8000 rows each. 256 * 8000 is ~2M.

#11762 has less partition, and thus speed up a lot

For low cardinality (2M rows with only 4 values)
#11762 has 4 partitions, each 0.5M rows
main run partial aggregate and got 4 partition after repartition, only 1 row to process in final step

I can see #11762 is slightly slower than main for low cardinality case but the regression is negligible compare to high cardinality case.

Next, I want to move repartition within single mode group by. I guess we can see comparable result for low cardinality case

Upd: #11777 removes pre-repartition overall, it beats low cardinality case slightly. 30x faster from main but ~1.5x slower compare with #11762 for high cardinality case. I think we could consider to enforce single group by without repartition for multi group by (#11777), and find out a way to parallel partition inside group by node to further improve for high cardinality case. This also shows that pre-repartition improve high cardinality case but not low cardinality case 🤔

#11777 optimize the plan from

// #11762
01)GlobalLimitExec: skip=0, fetch=10
02)--SortPreservingMergeExec: [count(*)@2 DESC], fetch=10
03)----SortExec: TopK(fetch=10), expr=[count(*)@2 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=SinglePartitioned, gby=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([UserID@0, SearchPhrase@1], 4), input_partitions=4
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
08)--------------ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[UserID, SearchPhrase]

to

// #11777
physical_plan
01)SortExec: TopK(fetch=10), expr=[count(*)@2 DESC], preserve_partitioning=[false]
02)--AggregateExec: mode=Single, gby=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)]
03)----ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[UserID, SearchPhrase]

while plan in main branch is

01)GlobalLimitExec: skip=0, fetch=10
02)--SortPreservingMergeExec: [count(*)@2 DESC], fetch=10
03)----SortExec: TopK(fetch=10), expr=[count(*)@2 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([UserID@0, SearchPhrase@1], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[UserID@0 as UserID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/clickbench_hits_10.parquet]]}, projection=[UserID, SearchPhrase]

@alamb
Copy link
Contributor Author

alamb commented Aug 8, 2024

I think this data is very interesting and we should look more deeply into why is the single group mode faster than doing a repartition / aggregate.

It seems like the only differences are:

  1. There is a RepartitionExec and CoalesceBatchesExec
  2. The final AggregateExec happens in parallel (but on distinct subsets of the group)

I would expect doing the final aggregate in parallel on distinct subsets to be about as fast

So one reasonable conclusion conclusion that the overhead of RepartitionExec and CoalesceBatchesExec accounts for the difference 🤔 and this if we reduced the Repartition overhead we could see similar improvements as the group by single mode

This is the idea behind exploring #11647 -- I think we could avoid a copy at the output of CoalesceBatchesExec which would help to reduce the overhead

@Rachelint
Copy link
Contributor

Rachelint commented Aug 8, 2024

I think this data is very interesting and we should look more deeply into why is the single group mode faster than doing a repartition / aggregate.

It seems like the only differences are:

1. There is a `RepartitionExec` and `CoalesceBatchesExec`

2. The final `AggregateExec` happens in parallel  (but on distinct subsets of the group)

I would expect doing the final aggregate in parallel on distinct subsets to be about as fast

So one reasonable conclusion conclusion that the overhead of RepartitionExec and CoalesceBatchesExec accounts for the difference 🤔 and this if we reduced the Repartition overhead we could see similar improvements as the group by single mode

This is the idea behind exploring #11647 -- I think we could avoid a copy at the output of CoalesceBatchesExec which would help to reduce the overhead

It seems the cpu cost about RepartitionExec and CoalesceBatchesExec is not the bottleleck for the q32 in clickbench according to the flamegraph(it is generated based on my experiment branch, and has merge the avg convert_to_state branch).

One possibility is that it may not be the problem about CPU cost, and it is the problem about schedule?

xxx

@alamb
Copy link
Contributor Author

alamb commented Aug 8, 2024

One possibility is that it may not be the problem about CPU cost, and it is the problem about schedule?

I am not sure -- is there any chance you can attach the svg version of the flamegraph so I can zoom in and look at it more carefully?

@Rachelint
Copy link
Contributor

Rachelint commented Aug 8, 2024

One possibility is that it may not be the problem about CPU cost, and it is the problem about schedule?

I am not sure -- is there any chance you can attach the svg version of the flamegraph so I can zoom in and look at it more carefully?

Ok, svg here:
https://github.com/Rachelint/drawio-store/blob/738d864dd1fb0eb4639ddc1d1c18f87877c3e7a4/dperf.08085.svg

@alamb
Copy link
Contributor Author

alamb commented Aug 8, 2024

One possibility is that it may not be the problem about CPU cost, and it is the problem about schedule?

I am not sure -- is there any chance you can attach the svg version of the flamegraph so I can zoom in and look at it more carefully?

Ok, svg here: https://github.com/Rachelint/drawio-store/blob/738d864dd1fb0eb4639ddc1d1c18f87877c3e7a4/dperf.08085.svg

That is quite cool -- thank you @Rachelint

I see evidence of why you proposed apache/arrow-rs#6146 / apache/arrow-rs#6155 🏃
Screenshot 2024-08-08 at 5 05 45 PM
Screenshot 2024-08-08 at 5 06 51 PM

@Rachelint
Copy link
Contributor

@alamb Yes... the eager computation of null count is actually not as cheap as we expect...

@Rachelint
Copy link
Contributor

Rachelint commented Aug 9, 2024

I have some thoughts about why SinglePartitioned is faster than Partial + FinalPartitioned in high cardinality case.
It may be as same as the skip partial optimization merged recently?

They all skipped the partial agg making no benefit in the high cardinality + large amount distinct values case, and in my local their improvements (for q32) are close.

@yjshen
Copy link
Member

yjshen commented Sep 4, 2024

It seems the cpu cost about RepartitionExec and CoalesceBatchesExec is not the bottleleck for the Q32

What @jayzhan211 experiments and shows the effects of single aggregate performance benefits in #11762 and #11777 is on Clickbench Q17/Q18 instead of Q32.

As of today, I see that Q32 performance is comparable to that in DuckDB on an M3 Mac.

# DuckDB Q32:
0.5369797919993289
0.44854350000969134
0.41927954100538045

# DataFusion main(780cccb52)
0.620
0.400
0.409

But for Q17, we are still behind:

# DuckDB
0.5953990409907419
0.5309897500119405
0.5242392499931157

# DataFusion main(780cccb52)
1.145
1.072
1.082

We would probably need to consolidate Aggregate(Partial and Final) and Repartition into a single place in order to be able to adaptively choose aggregate mode/algorithm based on runtime statistics.

@Rachelint
Copy link
Contributor

It seems the cpu cost about RepartitionExec and CoalesceBatchesExec is not the bottleleck for the Q32

What @jayzhan211 experiments and shows the effects of single aggregate performance benefits in #11762 and #11777 is on Clickbench Q17/Q18 instead of Q32.

As of today, I see that Q32 performance is comparable to that in DuckDB on an M3 Mac.

# DuckDB Q32:
0.5369797919993289
0.44854350000969134
0.41927954100538045

# DataFusion main(780cccb52)
0.620
0.400
0.409

But for Q17, we are still behind:

# DuckDB
0.5953990409907419
0.5309897500119405
0.5242392499931157

# DataFusion main(780cccb52)
1.145
1.072
1.082

We would probably need to consolidate Aggregate(Partial and Final) and Repartition into a single place in order to be able to adaptively choose aggregate mode/algorithm based on runtime statistics.

I see the improvement about q32 in later pr #11792, and I guess the reason why performance improved may be simlar as the partial skipping? Maybe q17/q18 are improved due to different reason with q32?

I agree maybe we should perform the similar mechanism about select the merging mode dynamicly like duckdb.

@jayzhan211
Copy link
Contributor

jayzhan211 commented Sep 5, 2024

I guess the reason why performance improved may be simlar as the partial skipping

Yes, it is why I experiment with single mode, force to avoid partial and repartition stage for all query, sadly, this doesn't work well for low cardinality case

We would probably need to consolidate Aggregate(Partial and Final) and Repartition into a single place in order to be able to adaptively choose aggregate mode/algorithm based on runtime statistics.

I agree, similar to my idea before.

Alternative idea for improvement is, if we can combine partial group + repartition + final group in one operation. We could probably avoid converting to row once again in final group.

However, the refactor is quite challenging

@jayzhan211
Copy link
Contributor

jayzhan211 commented Sep 5, 2024

I try an alternative way other than merging partial + final. Continues on the single mode but find a way to optimize on low cardinality case.

I found the reason that slow down tcph Q1 is that we repartition the batch so there are much more batches with a smaller size to compute than neccessary. I try to remove the repartition + coalesce batch and found out the performance improves!

// main
Aggregate Partial. (each batches size has around 16k)
    Repartition
       Coalesce
            Aggregate Final
// single mode only
Projection (each batches size has around 16k)
    Repartition (re-hash, each batch sizes has only 10k)
       Coalesce
            Aggregate Single Partition (more `intern` is called)
// single mode without repartition. This is also only half a time than main branch, since there is only single aggregate exec
Projection (each batches size has around 16k)
     Aggregate Single Partition (equivalent `intern` is called)

#12340 is a hack on specific query only, require to extend to general query

The next thing is to find out when and how should I avoid EnforceDistribution and bypass SanityCheckPlan if needed. And ensure there is no other queries slow down.

Print out of the columns size
// Without repartition + coalesce
called: SinglePartitioned
cols: 12101
called: SinglePartitioned
cols: 11830
called: SinglePartitioned
cols: 12061
called: SinglePartitioned
cols: 12147
called: SinglePartitioned
cols: 12065
called: SinglePartitioned
cols: 12180
called: SinglePartitioned
cols: 11969
called: SinglePartitioned
cols: 8227
called: SinglePartitioned
cols: 11911
called: SinglePartitioned
cols: 11942
called: SinglePartitioned
cols: 11935
called: SinglePartitioned
cols: 12107
called: SinglePartitioned
cols: 12040
called: SinglePartitioned
cols: 12138
called: SinglePartitioned
cols: 12058
called: SinglePartitioned
cols: 12080
called: SinglePartitioned
cols: 12032
called: SinglePartitioned
cols: 8280
called: SinglePartitioned
cols: 11969
called: SinglePartitioned
cols: 11927
called: SinglePartitioned
cols: 11964
called: SinglePartitioned
cols: 12239
called: SinglePartitioned
cols: 12017
called: SinglePartitioned
cols: 12186
called: SinglePartitioned
cols: 12024
called: SinglePartitioned
cols: 11982
called: SinglePartitioned
cols: 11982
called: SinglePartitioned
cols: 12012
called: SinglePartitioned
cols: 9185
called: SinglePartitioned
cols: 12083
called: SinglePartitioned
cols: 11931
called: SinglePartitioned
cols: 8290
called: SinglePartitioned
cols: 11915
called: SinglePartitioned
cols: 12091
called: SinglePartitioned
cols: 12036
called: SinglePartitioned
cols: 11930
called: SinglePartitioned
cols: 11956
called: SinglePartitioned
cols: 11937
called: SinglePartitioned
cols: 12022
called: SinglePartitioned
cols: 11912
called: SinglePartitioned
cols: 15718
called: SinglePartitioned
cols: 12063
called: SinglePartitioned
cols: 11962
called: SinglePartitioned
cols: 11894
called: SinglePartitioned
cols: 8893
called: SinglePartitioned
cols: 14469
called: SinglePartitioned
cols: 11944
called: SinglePartitioned
cols: 12007
called: SinglePartitioned
cols: 3765
called: SinglePartitioned
cols: 8763
called: SinglePartitioned
cols: 11897
called: SinglePartitioned
cols: 1788
// main
called: Partial
cols: 16096
called: Partial
cols: 16141
called: Partial
cols: 16127
called: Partial
cols: 16139
called: Partial
cols: 16127
called: Partial
cols: 16199
called: Partial
cols: 16115
called: Partial
cols: 16159
called: Partial
cols: 16211
called: Partial
cols: 16167
called: Partial
cols: 16134
called: Partial
cols: 16099
called: Partial
cols: 16120
called: Partial
cols: 16122
called: Partial
cols: 16169
called: Partial
cols: 16151
called: Partial
cols: 16103
called: Partial
cols: 16175
called: Partial
cols: 16169
called: Partial
cols: 16106
called: Partial
cols: 16133
called: Partial
cols: 16122
called: Partial
cols: 16164
called: Partial
cols: 16148
called: Partial
cols: 16173
called: Partial
cols: 16201
called: Partial
cols: 16116
called: Partial
cols: 16167
called: Partial
cols: 16198
called: Partial
cols: 16110
called: Partial
cols: 16153
called: Partial
cols: 16184
called: Partial
cols: 16104
called: Partial
cols: 16131
called: Partial
cols: 16160
called: Partial
cols: 16153
called: Partial
cols: 2427
called: Partial
cols: 2461
called: Partial
cols: 2437
called: Partial
cols: 3285
called: FinalPartitioned
cols: 4
called: FinalPartitioned
cols: 8
called: FinalPartitioned
cols: 4

So one reasonable conclusion conclusion that the overhead of RepartitionExec and CoalesceBatchesExec accounts for the difference 🤔 and this if we reduced the Repartition overhead we could see similar improvements as the group by single mode

Does anyone know what is the rationale of having repartition + coalesce, what kind of query benefits from it? From my experiment, I can see both high cardinality case + low cardinality case improves a lot without partial + repartition + coalesce. Does that mean we could remove them at all 🤔? My understanding of repartition + coalesce is to rebalance the batches, split (repartition) and merge (coalesce) batches. Does it benefit for the weird incoming batch size cases? Or maybe it is nice for query other than group by cases?

@Rachelint
Copy link
Contributor

I guess the reason why performance improved may be simlar as the partial skipping

Yes, it is why I experiment with single mode, force to avoid partial and repartition stage for all query, sadly, this doesn't work well for low cardinality case

We would probably need to consolidate Aggregate(Partial and Final) and Repartition into a single place in order to be able to adaptively choose aggregate mode/algorithm based on runtime statistics.

I agree, similar to my idea before.

Alternative idea for improvement is, if we can combine partial group + repartition + final group in one operation. We could probably avoid converting to row once again in final group.

However, the refactor is quite challenging

For aggr, It may be used to perform the parallel merging in final aggr from partial aggr.
In my knowledge, duck seems use partitioned hashtable to perform the similar mechanism?

@yjshen
Copy link
Member

yjshen commented Sep 6, 2024

Does anyone know what the rationale of having repartition + coalesce, what kind of query benefits from it

The primary reason is scalability. Efficient aggregation requires multi-core CPUs to process data in parallel. To facilitate this and prevent contention from multiple threads altering a single hash table simultaneously (often managed with locks), a repartition phase is introduced.

This repartition allows each thread to perform aggregation independently. Furthermore, pre-aggregation is employed to conduct preliminary calculations to streamline repartitioning, significantly reducing the volume of data that needs repartitioning(in cases either the cadinatlity is low or there are several hot keys).

The next thing is to find out when and how should I avoid EnforceDistribution and bypass SanityCheckPlan if needed. And ensure there is no other queries slow down.

This when-and-how problem is difficult for query engines because it requires foreknowledge of the data characteristics on grouping keys. It's even harder for DataFusion since we have very limited table metadata that could help us with this decision.

In an adaptive approach, the aggregate operator could internally start with one method and move to another without interacting with other components(the physical optimizer and other physical operators), making it more feasible.

@Rachelint
Copy link
Contributor

Rachelint commented Sep 8, 2024

Does anyone know what the rationale of having repartition + coalesce, what kind of query benefits from it

The primary reason is scalability. Efficient aggregation requires multi-core CPUs to process data in parallel. To facilitate this and prevent contention from multiple threads altering a single hash table simultaneously (often managed with locks), a repartition phase is introduced.

This repartition allows each thread to perform aggregation independently. Furthermore, pre-aggregation is employed to conduct preliminary calculations to streamline repartitioning, significantly reducing the volume of data that needs repartitioning(in cases either the cadinatlity is low or there are several hot keys).

The next thing is to find out when and how should I avoid EnforceDistribution and bypass SanityCheckPlan if needed. And ensure there is no other queries slow down.

This when-and-how problem is difficult for query engines because it requires foreknowledge of the data characteristics on grouping keys. It's even harder for DataFusion since we have very limited table metadata that could help us with this decision.

In an adaptive approach, the aggregate operator could internally start with one method and move to another without interacting with other components(the physical optimizer and other physical operators), making it more feasible.

Maybe reducing the cost of repartition is an alternative? I think the reason why partial + final lead to performance regression in some situation is possible that the cost of splitting batches is greater than the improvement of parallel merging?
I am trying a poc about fusing the repartition and partial aggregation.

@alamb
Copy link
Contributor Author

alamb commented Sep 9, 2024

I am trying a poc about fusing the repartition and partial aggregation.

That is an interesting idea (perhaps have the partial group by operator produce partitioned output somehow (as it already knows the hash values of each group 🤔 )

@Rachelint
Copy link
Contributor

Does anyone know what the rationale of having repartition + coalesce, what kind of query benefits from it

The primary reason is scalability. Efficient aggregation requires multi-core CPUs to process data in parallel. To facilitate this and prevent contention from multiple threads altering a single hash table simultaneously (often managed with locks), a repartition phase is introduced.
This repartition allows each thread to perform aggregation independently. Furthermore, pre-aggregation is employed to conduct preliminary calculations to streamline repartitioning, significantly reducing the volume of data that needs repartitioning(in cases either the cadinatlity is low or there are several hot keys).

The next thing is to find out when and how should I avoid EnforceDistribution and bypass SanityCheckPlan if needed. And ensure there is no other queries slow down.

This when-and-how problem is difficult for query engines because it requires foreknowledge of the data characteristics on grouping keys. It's even harder for DataFusion since we have very limited table metadata that could help us with this decision.
In an adaptive approach, the aggregate operator could internally start with one method and move to another without interacting with other components(the physical optimizer and other physical operators), making it more feasible.

Maybe reducing the cost of repartition is an alternative? I think the reason why partial + final lead to performance regression in some situation is possible that the cost of splitting batches is greater than the improvement of parallel merging? I am trying a poc about fusing the repartition and partial aggregation.

It is exciting that this idea seems promising!
I tried some related queries in my local, and it seems faster.

  • Main
Q16: SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Query 16 iteration 0 took 5537.5 ms and returned 10 rows
Query 16 iteration 1 took 5308.1 ms and returned 10 rows
Query 16 iteration 2 took 5499.6 ms and returned 10 rows
Query 16 iteration 3 took 5561.5 ms and returned 10 rows
Query 16 iteration 4 took 5380.3 ms and returned 10 rows

Q17: SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10;
Query 17 iteration 0 took 4961.5 ms and returned 10 rows
Query 17 iteration 1 took 4926.8 ms and returned 10 rows
Query 17 iteration 2 took 5000.1 ms and returned 10 rows
Query 17 iteration 3 took 4952.6 ms and returned 10 rows
Query 17 iteration 4 took 4922.3 ms and returned 10 rows
Q16: SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
Query 16 iteration 0 took 5252.9 ms and returned 10 rows
Query 16 iteration 1 took 5092.9 ms and returned 10 rows
Query 16 iteration 2 took 5102.9 ms and returned 10 rows
Query 16 iteration 3 took 5181.0 ms and returned 10 rows
Query 16 iteration 4 took 5169.2 ms and returned 10 rows

Q17: SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10;
Query 17 iteration 0 took 4660.8 ms and returned 10 rows
Query 17 iteration 1 took 4619.3 ms and returned 10 rows
Query 17 iteration 2 took 4588.3 ms and returned 10 rows
Query 17 iteration 3 took 4717.4 ms and returned 10 rows
Query 17 iteration 4 took 4633.0 ms and returned 10 rows

I will check it more carefully again, and if it actually works, I will submit a formal PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants