Skip to content

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Feb 27, 2025

Which issue does this PR close?

Rationale for this change

Demonstrate the bug.
Afterwards we can continue the process of figuring out the proper fix.

What changes are included in this PR?

Adds a single test case, and associated methods.

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Feb 27, 2025
@wiedld wiedld marked this pull request as ready for review February 27, 2025 18:09
@alamb alamb changed the title Demonstrate EnforceSorting can remove a needed coalesce Add tests for Demonstrate EnforceSorting can remove a needed coalesce Feb 28, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld

@alamb
Copy link
Contributor

alamb commented Feb 28, 2025

FYI @berkaysynnada

@alamb alamb merged commit 32224b4 into apache:main Feb 28, 2025
26 checks passed
@alamb alamb deleted the 14691/add-test-to-demonstrate-bug branch February 28, 2025 01:58
get_plan_string(&plan),
vec![
"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
" AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intentionally put SinglePartitioned aggregate mode? The input is only 1 partition, so why don't you use AggregateMode::Single? I'm unsure if this is a valid case @wiedld @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I update the mode as Single, there seems no problem to me

Copy link
Contributor Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking the time @berkaysynnada .

The test case merged here is a minimal reproducer. The mode=SinglePartitioned is coming from the actual plans we have running and failing in prod: influxdata#58 (comment)

A simplified view of it is:

"SortExec: expr=[a@0 ASC], preserve_partitioning=[false]",
"  AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]",
"    CoalescePartitionsExec",
"      ProjectionExec: expr=[a@0 as a, b@1 as value]",
"        UnionExec",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
"          DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"

The coalesce gets removed for the mode=SinglePartitioned. Do you think that the bug is elsewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    /// Applies the entire logical aggregation operation in a single operator,
    /// as opposed to Partial / Final modes which apply the logical aggregation using
    /// two operators.
    ///
    /// This mode requires that the input is partitioned by group key (like
    /// FinalPartitioned)
    SinglePartitioned,

It means that when an AggregateExec is SinglePartitioned, then its input should be multi-partitioned (I should say the wording is a bit confusing -- SinglePartitioned means Single layer of aggregation, but its input is Partitioned). However, in your example plan, AggregateExec has a single partition (as it is above CoalescePartitions). So, what I was trying to say that your initial plan should cannot ever exists in general conditions. I'd also like to remind that; EnforceSorting can handle invalid plans and make them valid in terms of ordering conditions (and expects valid distribution conditions), and EnforceDistribution does the same for distribution conditions (and again expects valid ordering conditions). However, in this reproducer, you are giving an invalid plan to the EnforceSorting in terms of distribution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SinglePartitioned means Single layer of aggregation, but its input is Partitioned). However, in your example plan, AggregateExec has a single partition (as it is above CoalescePartitions).

I see -- so you are saying that if the input to the AggregateExec has only a single partitition, then it should never be SinglePartitioned.

So you are saying that the input plan is not valid and thus the error is elsewhere (whatever generated this plan).

I will make a PR to clarify the documentation as I agree it is quite confusing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@wiedld wiedld Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, in this reproducer, you are giving an invalid plan to the EnforceSorting in terms of distribution.

However, in your example plan, AggregateExec has a single partition (as it is above CoalescePartitions)

I think this means that the insertion of the coalesce in the first place (during the enforce distribution) is the bug. Since the input into the enforce distribution optimizer is:

     ...nodes...
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[sum(Value)]
          AggregateExec: mode=Partial, gby=[date_bin_wallclock(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 10000000000 }, time@0, 0) as time], aggr=[sum(Value)]
            SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[false]
              ProjectionExec: expr=[time@1 as time, f@0 as Value]
                UnionExec. **multi-partitions**
                  ...multiple nodes...

Our Union outputs multiple partitions, and the AggregateExec;mode=Partial can take multiple partitions. Therefore the enforce distribution should not have inserted the coalesce?

I'll start by updating the reproducer test cases. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a previous reproducer, I showed how the enforce distribution was inserting the coalesce.
See here: influxdata#58 (comment)

On the latest main, we no longer have the coalesce being inserted.
See here: #14949 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to revert this PR, with this test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our Union outputs multiple partitions, and the AggregateExec;mode=Partial can take multiple partitions. Therefore the enforce distribution should not have inserted the coalesce?

It might insert a coalesce, but then it has to update AggregateExec: mode=FinalPartitioned to AggregateExec: mode=Final

wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Feb 28, 2025
alamb pushed a commit that referenced this pull request Mar 1, 2025
@alamb
Copy link
Contributor

alamb commented Mar 1, 2025

Reverted in

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants