Skip to content

Streaming Aggregate operator not being used in deduplication of pre-sorted Parquet files #16919

@zheniasigayev

Description

@zheniasigayev

Describe the bug

See discussion: Best practices for memory-efficient deduplication of pre-sorted Parquet files #16776

After investigating an optimal approach to perform deduplication on multiple pre-sorted Parquet files, it's believed that the following queries should be using the Streaming Aggregate operator, rather than buffering the entire dataset in a HashAggregateStream. This likely causes the out of memory error described in the discussion.

To Reproduce

Step 1) Generate Parquet files

I created a script to produce equivalent parquet files. I confirmed that identical query plans, and memory consumers are seen when running the below 2 queries against synthetic data.

Create equivalent Parquet files using this Python script: https://gist.github.com/zheniasigayev/2e5e471c9070cfa685d938bced47aa7f

Step 2) Run Reproducible Queries

Run: datafusion-cli -m 8G -d 50G --top-memory-consumers 25 with default settings.

2.1) Original Deduplication Query:

CREATE EXTERNAL TABLE example (
    col_1 VARCHAR(50) NOT NULL,
    col_2 BIGINT NOT NULL,
    col_3 VARCHAR(50),
    col_4 VARCHAR(50),
    col_5 VARCHAR(50),
    col_6 VARCHAR(100) NOT NULL,
    col_7 VARCHAR(50),
    col_8 DOUBLE
) 
WITH ORDER (col_1 ASC, col_2 ASC) 
STORED AS PARQUET 
LOCATION '/tmp/redacted/*.parquet';

EXPLAIN COPY (
    SELECT 
        col_1,
        col_2,
        col_3,
        col_4,
        col_5,
        col_6,
        first_value(col_7) AS col_7,
        first_value(col_8) AS col_8
    FROM 
        example 
    GROUP BY 
        col_1, col_2, col_3, col_4, col_5, col_6 
    ORDER BY 
        col_1 ASC, col_2 ASC
) 
TO '/tmp/result.parquet' 
STORED AS PARQUET 
OPTIONS (compression 'zstd(1)');

2.2) Removing first_value() Aggregate and associated columns:

It's not possible to only remove the first_value() aggregate from the above query since col_7 and col_8 won't appear in the GROUP BY clause.

Error during planning: Column in SELECT must be in GROUP BY or an aggregate function: While expanding wildcard, column "example.col_7" must appear in the GROUP BY clause or must be part of an aggregate function, currently only "example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6" appears in the SELECT clause satisfies this requirement

Instead, I removed col_7 and col_8 (the columns which first_value() aggregate is applied to). This is the resulting query:

CREATE EXTERNAL TABLE example (
    col_1 VARCHAR(50) NOT NULL,
    col_2 BIGINT NOT NULL,
    col_3 VARCHAR(50),
    col_4 VARCHAR(50),
    col_5 VARCHAR(50),
    col_6 VARCHAR(100) NOT NULL,
    col_7 VARCHAR(50),
    col_8 DOUBLE
) 
WITH ORDER (col_1 ASC, col_2 ASC) 
STORED AS PARQUET 
LOCATION '/tmp/redacted/*.parquet';

COPY (
    SELECT 
        col_1,
        col_2,
        col_3,
        col_4,
        col_5,
        col_6
    FROM 
        example 
    GROUP BY 
        col_1, col_2, col_3, col_4, col_5, col_6
    ORDER BY 
        col_1 ASC, col_2 ASC
) 
TO '/tmp/result_part2.parquet' 
STORED AS PARQUET 
OPTIONS (compression 'zstd(1)');

Expected behavior

  • Given that the data is sorted by col_1 and col_2, its expected that the original query would use the streaming aggregate operator (which should not have much memory at all) (discussion).

Additional context

Detailed Query Plans

Generated using EXPLAIN FORMAT INDENT ...

Original Deduplication Query:

+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | CopyTo: format=parquet output_url=/tmp/result.parquet options: (format.compression zstd(1))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |   Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |     Projection: example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6, first_value(example.col_7) AS col_7, first_value(example.col_8) AS col_8                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |       Aggregate: groupBy=[[example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6]], aggr=[[first_value(example.col_7), first_value(example.col_8)]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|               |         TableScan: example projection=[col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[])                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |   SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |     SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |       ProjectionExec: expr=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6, first_value(example.col_7)@6 as col_7, first_value(example.col_8)@7 as col_8]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |         AggregateExec: mode=FinalPartitioned, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[first_value(example.col_7), first_value(example.col_8)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |           CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |             RepartitionExec: partitioning=Hash([col_1@0, col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 10), input_partitions=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|               |               AggregateExec: mode=Partial, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[first_value(example.col_7), first_value(example.col_8)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|               |                 DataSourceExec: file_groups={10 groups: [[tmp/redacted/reproducible_data_0.parquet:0..12537303, tmp/redacted/reproducible_data_1.parquet:0..6245726], [tmp/redacted/reproducible_data_1.parquet:6245726..12518047, tmp/redacted/reproducible_data_10.parquet:0..12510708], [tmp/redacted/reproducible_data_10.parquet:12510708..12530931, tmp/redacted/reproducible_data_11.parquet:0..12518206, tmp/redacted/reproducible_data_12.parquet:0..6244600], [tmp/redacted/reproducible_data_12.parquet:6244600..12522074, tmp/redacted/reproducible_data_13.parquet:0..12505555], [tmp/redacted/reproducible_data_13.parquet:12505555..12523871, tmp/redacted/reproducible_data_14.parquet:0..12515635, tmp/redacted/reproducible_data_2.parquet:0..6249078], ...]}, projection=[col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Removing first_value() Aggregate and associated columns:

+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | CopyTo: format=parquet output_url=/tmp/result_part2.parquet options: (format.compression zstd(1))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |   Sort: example.col_1 ASC NULLS LAST, example.col_2 ASC NULLS LAST                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |     Aggregate: groupBy=[[example.col_1, example.col_2, example.col_3, example.col_4, example.col_5, example.col_6]], aggr=[[]]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |       TableScan: example projection=[col_1, col_2, col_3, col_4, col_5, col_6]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| physical_plan | DataSinkExec: sink=ParquetSink(file_groups=[])                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|               |   SortPreservingMergeExec: [col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|               |     SortExec: expr=[col_1@0 ASC NULLS LAST, col_2@1 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|               |       AggregateExec: mode=FinalPartitioned, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|               |         CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|               |           RepartitionExec: partitioning=Hash([col_1@0, col_2@1, col_3@2, col_4@3, col_5@4, col_6@5], 10), input_partitions=10                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|               |             AggregateExec: mode=Partial, gby=[col_1@0 as col_1, col_2@1 as col_2, col_3@2 as col_3, col_4@3 as col_4, col_5@4 as col_5, col_6@5 as col_6], aggr=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|               |               DataSourceExec: file_groups={10 groups: [[tmp/redacted/reproducible_data_0.parquet:0..12537303, tmp/redacted/reproducible_data_1.parquet:0..6245726], [tmp/redacted/reproducible_data_1.parquet:6245726..12518047, tmp/redacted/reproducible_data_10.parquet:0..12510708], [tmp/redacted/reproducible_data_10.parquet:12510708..12530931, tmp/redacted/reproducible_data_11.parquet:0..12518206, tmp/redacted/reproducible_data_12.parquet:0..6244600], [tmp/redacted/reproducible_data_12.parquet:6244600..12522074, tmp/redacted/reproducible_data_13.parquet:0..12505555], [tmp/redacted/reproducible_data_13.parquet:12505555..12523871, tmp/redacted/reproducible_data_14.parquet:0..12515635, tmp/redacted/reproducible_data_2.parquet:0..6249078], ...]}, projection=[col_1, col_2, col_3, col_4, col_5, col_6], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions