From d5a629f60732f02bd516e6c7b6791153d6d7697b Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 28 Oct 2025 15:28:13 -0400 Subject: [PATCH] Add reproducer for consecutive RepartitionExec --- .../test_files/aggregate_repartition.slt | 136 ++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/aggregate_repartition.slt diff --git a/datafusion/sqllogictest/test_files/aggregate_repartition.slt b/datafusion/sqllogictest/test_files/aggregate_repartition.slt new file mode 100644 index 000000000000..27602b61e424 --- /dev/null +++ b/datafusion/sqllogictest/test_files/aggregate_repartition.slt @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Reproducer for https://github.com/apache/datafusion/issues/18341 +# Tests for aggregate repartition behavior +# Comparing CSV vs Parquet execution plans for GROUP BY queries + +# Create CSV version of the dimension data +query I +COPY ( + SELECT * FROM (VALUES + ('prod', 100, 'A'), + ('dev', 200, 'B'), + ('test', 150, 'A'), + ('prod', 300, 'C'), + ('dev', 250, 'B') + ) AS t(env, value, category) +) +TO 'test_files/scratch/aggregate_repartition/dim.csv' +STORED AS CSV +OPTIONS ('format.has_header' 'true'); +---- +5 + +# Create Parquet version of the dimension data +query I +COPY ( + SELECT * FROM (VALUES + ('prod', 100, 'A'), + ('dev', 200, 'B'), + ('test', 150, 'A'), + ('prod', 300, 'C'), + ('dev', 250, 'B') + ) AS t(env, value, category) +) +TO 'test_files/scratch/aggregate_repartition/dim.parquet' +STORED AS PARQUET; +---- +5 + +# Create external table for CSV +statement ok +CREATE EXTERNAL TABLE dim_csv +STORED AS CSV +LOCATION 'test_files/scratch/aggregate_repartition/dim.csv' +OPTIONS ('format.has_header' 'true'); + +# Create external table for Parquet +statement ok +CREATE EXTERNAL TABLE dim_parquet +STORED AS PARQUET +LOCATION 'test_files/scratch/aggregate_repartition/dim.parquet'; + +# Test 1: EXPLAIN query for CSV table with GROUP BY +# This plans looks reasonable +query TT +EXPLAIN SELECT env, count(*) FROM dim_csv GROUP BY env; +---- +logical_plan +01)Projection: dim_csv.env, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[dim_csv.env]], aggr=[[count(Int64(1))]] +03)----TableScan: dim_csv projection=[env] +physical_plan +01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4 +05)--------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.csv]]}, projection=[env], file_type=csv, has_header=true + +# Test 2: EXPLAIN query for Parquet table with GROUP BY +# This plan differs from the one above and includes two consecutive repartitions — one round-robin and one hash — +# which seems unnecessary. We may want to align it with the previous plan (push the round robin down or remove the round robin), or, if the input file is small, +# avoid repartitioning altogether. A single partition should suffice for a single-step aggregate as the plan after this. + +query TT +EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env; +---- +logical_plan +01)Projection: dim_parquet.env, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]] +03)----TableScan: dim_parquet projection=[env] +physical_plan +01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=FinalPartitioned, gby=[env@0 as env], aggr=[count(Int64(1))] +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([env@0], 4), input_partitions=4 +05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +06)----------AggregateExec: mode=Partial, gby=[env@0 as env], aggr=[count(Int64(1))] +07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet + +# Verify the queries actually work and return the same results +query TI rowsort +SELECT env, count(*) FROM dim_csv GROUP BY env; +---- +dev 2 +prod 2 +test 1 + +query TI rowsort +SELECT env, count(*) FROM dim_parquet GROUP BY env; +---- +dev 2 +prod 2 +test 1 + +# Test 3: Change target partitions to 1 to have single-aggregate plan +statement ok +SET datafusion.execution.target_partitions = 1; + +query TT +EXPLAIN SELECT env, count(*) FROM dim_parquet GROUP BY env; +---- +logical_plan +01)Projection: dim_parquet.env, count(Int64(1)) AS count(*) +02)--Aggregate: groupBy=[[dim_parquet.env]], aggr=[[count(Int64(1))]] +03)----TableScan: dim_parquet projection=[env] +physical_plan +01)ProjectionExec: expr=[env@0 as env, count(Int64(1))@1 as count(*)] +02)--AggregateExec: mode=Single, gby=[env@0 as env], aggr=[count(Int64(1))] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/aggregate_repartition/dim.parquet]]}, projection=[env], file_type=parquet