Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support no distinct aggregate sum/min/max in single_distinct_to_group_by rule #8124

Closed
wants to merge 9 commits into from

Conversation

haohuaijin
Copy link
Contributor

@haohuaijin haohuaijin commented Nov 10, 2023

Which issue does this PR close?

Closes #8123

Rationale for this change

In this pr

❯ SELECT "RegionID", SUM("AdvEngineID"), COUNT(DISTINCT "UserID") FROM '../benchmarks/data/hits.parquet' GROUP BY "RegionID" order by "RegionID" limit 10;
+----------+--------------------------------------------------+--------------------------------------------------------+
| RegionID | SUM(../benchmarks/data/hits.parquet.AdvEngineID) | COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID) |
+----------+--------------------------------------------------+--------------------------------------------------------+
| 0        | 0                                                | 8                                                      |
| 1        | 147946                                           | 239380                                                 |
| 2        | 441662                                           | 1081016                                                |
| 3        | 39724                                            | 131195                                                 |
| 4        | 34557                                            | 79500                                                  |
| 5        | 13502                                            | 40914                                                  |
| 6        | 24338                                            | 55768                                                  |
| 7        | 28417                                            | 64989                                                  |
| 8        | 34483                                            | 65472                                                  |
| 9        | 38047                                            | 91576                                                  |
+----------+--------------------------------------------------+--------------------------------------------------------+
10 rows in set. Query took 0.945 seconds.

❯ explain SELECT "RegionID", SUM("AdvEngineID"), COUNT(DISTINCT "UserID") FROM '../benchmarks/data/hits.parquet' GROUP BY "RegionID" order by "RegionID" limit 10;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |   Sort: ../benchmarks/data/hits.parquet.RegionID ASC NULLS LAST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |     Aggregate: groupBy=[[../benchmarks/data/hits.parquet.RegionID]], aggr=[[SUM(alias2) AS SUM(../benchmarks/data/hits.parquet.AdvEngineID), COUNT(alias1) AS COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID)]]                                                                                                                                                                                                                                                                    |
|               |       Aggregate: groupBy=[[../benchmarks/data/hits.parquet.RegionID, ../benchmarks/data/hits.parquet.UserID AS alias1]], aggr=[[SUM(CAST(../benchmarks/data/hits.parquet.AdvEngineID AS Int64)) AS alias2]]                                                                                                                                                                                                                                                                               |
|               |         TableScan: ../benchmarks/data/hits.parquet projection=[RegionID, UserID, AdvEngineID]                                                                                                                                                                                                                                                                                                                                                                                             |
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |   SortPreservingMergeExec: [RegionID@0 ASC NULLS LAST], fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |     SortExec: TopK(fetch=10), expr=[RegionID@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                            |
|               |       AggregateExec: mode=FinalPartitioned, gby=[RegionID@0 as RegionID], aggr=[SUM(../benchmarks/data/hits.parquet.AdvEngineID), COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID)]                                                                                                                                                                                                                                                                                                 |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |           RepartitionExec: partitioning=Hash([RegionID@0], 24), input_partitions=24                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |             AggregateExec: mode=Partial, gby=[RegionID@0 as RegionID], aggr=[SUM(../benchmarks/data/hits.parquet.AdvEngineID), COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID)]                                                                                                                                                                                                                                                                                                    |
|               |               AggregateExec: mode=FinalPartitioned, gby=[RegionID@0 as RegionID, alias1@1 as alias1], aggr=[alias2]                                                                                                                                                                                                                                                                                                                                                                       |
|               |                 CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                               |
|               |                   RepartitionExec: partitioning=Hash([RegionID@0, alias1@1], 24), input_partitions=24                                                                                                                                                                                                                                                                                                                                                                                     |
|               |                     AggregateExec: mode=Partial, gby=[RegionID@0 as RegionID, UserID@1 as alias1], aggr=[alias2]                                                                                                                                                                                                                                                                                                                                                                          |
|               |                       ParquetExec: file_groups={24 groups: [[home/hhj/datafusion/benchmarks/data/hits.parquet:0..615832352], [home/hhj/datafusion/benchmarks/data/hits.parquet:615832352..1231664704], [home/hhj/datafusion/benchmarks/data/hits.parquet:1231664704..1847497056], [home/hhj/datafusion/benchmarks/data/hits.parquet:1847497056..2463329408], [home/hhj/datafusion/benchmarks/data/hits.parquet:2463329408..3079161760], ...]}, projection=[RegionID, UserID, AdvEngineID] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.043 seconds.

in main 43cc870

❯ SELECT "RegionID", SUM("AdvEngineID"), COUNT(DISTINCT "UserID") FROM '../benchmarks/data/hits.parquet' GROUP BY "RegionID" order by "RegionID" limit 10;
+----------+--------------------------------------------------+--------------------------------------------------------+
| RegionID | SUM(../benchmarks/data/hits.parquet.AdvEngineID) | COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID) |
+----------+--------------------------------------------------+--------------------------------------------------------+
| 0        | 0                                                | 8                                                      |
| 1        | 147946                                           | 239380                                                 |
| 2        | 441662                                           | 1081016                                                |
| 3        | 39724                                            | 131195                                                 |
| 4        | 34557                                            | 79500                                                  |
| 5        | 13502                                            | 40914                                                  |
| 6        | 24338                                            | 55768                                                  |
| 7        | 28417                                            | 64989                                                  |
| 8        | 34483                                            | 65472                                                  |
| 9        | 38047                                            | 91576                                                  |
+----------+--------------------------------------------------+--------------------------------------------------------+
10 rows in set. Query took 1.343 seconds.

❯ explain SELECT "RegionID", SUM("AdvEngineID"), COUNT(DISTINCT "UserID") FROM '../benchmarks/data/hits.parquet' GROUP BY "RegionID" order by "RegionID" limit 10;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Limit: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |   Sort: ../benchmarks/data/hits.parquet.RegionID ASC NULLS LAST, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |     Aggregate: groupBy=[[../benchmarks/data/hits.parquet.RegionID]], aggr=[[SUM(CAST(../benchmarks/data/hits.parquet.AdvEngineID AS Int64)), COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID)]]                                                                                                                                                                                                                                                                             |
|               |       TableScan: ../benchmarks/data/hits.parquet projection=[RegionID, UserID, AdvEngineID]                                                                                                                                                                                                                                                                                                                                                                                       |
| physical_plan | GlobalLimitExec: skip=0, fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|               |   SortPreservingMergeExec: [RegionID@0 ASC NULLS LAST], fetch=10                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |     SortExec: TopK(fetch=10), expr=[RegionID@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                    |
|               |       AggregateExec: mode=FinalPartitioned, gby=[RegionID@0 as RegionID], aggr=[SUM(../benchmarks/data/hits.parquet.AdvEngineID), COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID)]                                                                                                                                                                                                                                                                                         |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                               |
|               |           RepartitionExec: partitioning=Hash([RegionID@0], 24), input_partitions=24                                                                                                                                                                                                                                                                                                                                                                                               |
|               |             AggregateExec: mode=Partial, gby=[RegionID@0 as RegionID], aggr=[SUM(../benchmarks/data/hits.parquet.AdvEngineID), COUNT(DISTINCT ../benchmarks/data/hits.parquet.UserID)]                                                                                                                                                                                                                                                                                            |
|               |               ParquetExec: file_groups={24 groups: [[home/hhj/datafusion/benchmarks/data/hits.parquet:0..615832352], [home/hhj/datafusion/benchmarks/data/hits.parquet:615832352..1231664704], [home/hhj/datafusion/benchmarks/data/hits.parquet:1231664704..1847497056], [home/hhj/datafusion/benchmarks/data/hits.parquet:1847497056..2463329408], [home/hhj/datafusion/benchmarks/data/hits.parquet:2463329408..3079161760], ...]}, projection=[RegionID, UserID, AdvEngineID] |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.054 seconds.

What changes are included in this PR?

add no-distinct sum/min/max aggregate support in single_distinct_to_group_by rule

Are these changes tested?

yes, add soem tests

Are there any user-facing changes?

@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) substrait labels Nov 10, 2023
for e in args {
fields_set.insert(e.canonical_name());
match filter {
Some(_) => return Ok(false),
Copy link
Contributor Author

@haohuaijin haohuaijin Nov 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this pr, we also don't support filter in single_distinct_to_group_by rule. But we forget check it.

@jonahgao
Copy link
Member

jonahgao commented Nov 12, 2023

The behaviors of sum and count are different when dealing with empty inputs.
Re-writing count as count+sum could lead to incorrect results.

I test on this branch:

DataFusion CLI v33.0.0
❯ create table t(a int, b int);
0 rows in set. Query took 0.016 seconds.

❯ select count(distinct a), count(b) from t;
+---------------------+------------+
| COUNT(DISTINCT t.a) | COUNT(t.b) |
+---------------------+------------+
| 0                   |            |
+---------------------+------------+
1 row in set. Query took 0.029 seconds.

COUNT(t.b) should be 0, not NULL.

Maybe we need a special sum function like SqlSumEmptyIsZero in calcite, or just skip the optimization for count.

@haohuaijin
Copy link
Contributor Author

haohuaijin commented Nov 12, 2023

@jonahgao thanks for your provider this case. I didn't read the source code of calcite carefully.

I think implement SqlSumEmptyIsZero in datafusion is easy, we can add flag to Sum and init SumAccumulator to 0, and add a AggregateFunction about SqlSumEmptyIsZero and also should modify create_groups_accumulator. Here is a link about Sum0 in calcite.
Implementing sum0 requires more consider, so I removed the count overwrite first.

I have no preference between the two methods, what do you think @alamb ?

@alamb
Copy link
Contributor

alamb commented Nov 13, 2023

I hope to review this PR later today

@haohuaijin haohuaijin changed the title Support no distinct aggregate count/sum/min/max in single_distinct_to_group_by rule Support no distinct aggregate sum/min/max in single_distinct_to_group_by rule Nov 14, 2023
@alamb
Copy link
Contributor

alamb commented Nov 14, 2023

I am sorry -- I did not have a chance to get to this one. Note that this PR seems to have non trivial conflicts now (perhaps due to #8176

@haohuaijin
Copy link
Contributor Author

Due to #8176, I close this pr, and then reorganize the code and submit a new pr.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support no distinct count/max/min/sum aggregate in single_distinct_to_group_by rule
3 participants