Skip to content

Avoid consecutive RepartitionExec #18341

@NGA-TRAN

Description

@NGA-TRAN

Reproducer in sqllogictest: #18343

While experimenting with aggregation, I noticed cases where two RepartitionExec operators appear consecutively in the plan. This seems suboptimal and worth improving.

I’m using a dataset stored in both CSV and Parquet formats to test this behavior.

d_dkey,env,service,host
A,dev,log,ma
B,prod,log,ma
C,prod,log,vim
D,prod,trace,vim

The plan of for data in cvs file looks reasonable

EXPLAIN SELECT env, count(*) FROM dimension_csv GROUP BY env;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                        |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: dimension_csv.env, count(Int64(1)) AS count(*)                                                                                                                                  |
|               |   Aggregate: groupBy=[[dimension_csv.env]], aggr=[[count(Int64(1))]]                                                                                                                        |
|               |     TableScan: dimension_csv projection=[env]                                                                                                                                               |
| physical_plan | ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]                                                                                                                          |
|               |   AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]                                                                                                          |
|               |     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                             |
|               |       RepartitionExec: partitioning=Hash([env@0], 16), input_partitions=16                                                                                                                  |
|               |         AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]                                                                                                             |
|               |           RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1                                                                                                             |
|               |             DataSourceExec: file_groups={1 group: [[Users/hoabinhnga.tran/datafusion-optimal-plans/testdata/dimension1/dimension_1.csv]]}, projection=[env], file_type=csv, has_header=true |
|               |                                                                                                                                                                                             |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

And this is its graphical plan

Image

However, the plan for Parquet data doesn’t appear to push the round-robin repartition down far enough, resulting in two RepartitionExec operators placed back-to-back. This seems quite suboptimal and likely worth improving.

EXPLAIN SELECT env, count(*) FROM dimension_parquet GROUP BY env;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                               |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: dimension_parquet.env, count(Int64(1)) AS count(*)                                                                                                                     |
|               |   Aggregate: groupBy=[[dimension_parquet.env]], aggr=[[count(Int64(1))]]                                                                                                           |
|               |     TableScan: dimension_parquet projection=[env]                                                                                                                                  |
| physical_plan | ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]                                                                                                                 |
|               |   AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))]                                                                                                 |
|               |     CoalesceBatchesExec: target_batch_size=8192                                                                                                                                    |
|               |       RepartitionExec: partitioning=Hash([env@0], 16), input_partitions=16              -- Repartition Hash                                                                        |
|               |         RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1           -- Repartition Round Robin                                                                 |
|               |           AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))]                                                                                                  |
|               |             DataSourceExec: file_groups={1 group: [[Users/hoabinhnga.tran/datafusion-optimal-plans/testdata/dimension1/dimension_1.parquet]]}, projection=[env], file_type=parquet |
|               |                                                                                                                                                                                    |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Image

Fix Proposal

I suggest we fix this by either pushing the round-robin repartition further down—similar to the plan for the CSV file above—or eliminating it entirely if it's unnecessary.

Advanced proposal

If we can determine that the input file is small, there's no need to repartition the data. In that case, we should use a single-step aggregate—like in the example below.

-- Option to keep single partition
set datafusion.execution.target_partitions = 1;

EXPLAIN SELECT env, count(*) FROM dimension_parquet GROUP BY env;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                       |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Projection: dimension_parquet.env, count(Int64(1)) AS count(*)                                                                                                             |
|               |   Aggregate: groupBy=[[dimension_parquet.env]], aggr=[[count(Int64(1))]]                                                                                                   |
|               |     TableScan: dimension_parquet projection=[env]                                                                                                                          |
| physical_plan | ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)]                                                                                                         |
|               |   AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))]                                                                                                   |
|               |     DataSourceExec: file_groups={1 group: [[Users/hoabinhnga.tran/datafusion-optimal-plans/testdata/dimension1/dimension_1.parquet]]}, projection=[env], file_type=parquet |
|               |                                                                                                                                                                            |
Image

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions