Skip to content

Conversation

@EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Jan 5, 2023

What changes were proposed in this pull request?

Backport #39131 to branch-3.1.

Rule PushDownLeftSemiAntiJoin should not push an anti-join below an Aggregate when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while DeduplicateRelations cannot deduplicate those attributes (in this example due to the projection of value to id).

This behaviour already exists for Project and Union, but Aggregate lacks this safety guard.

Why are the changes needed?

Without this change, the optimizer creates an incorrect plan.

This example fails with distinct() (an aggregation), and succeeds without distinct(), but both queries are identical:

val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)

With distinct(), rule PushDownLeftSemiAntiJoin creates a join condition (value#907 + 1) = value#907, which can never be true. This effectively removes the anti-join.

Before this PR:
The anti-join is fully removed from the plan.

== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true

This is caused by PushDownLeftSemiAntiJoin adding join condition (value#907 + 1) = value#907, which is wrong as because id#910 in (id#910 + 1) AS id#912 exists in the right child of the join as well as in the left grandchild:

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

After this PR:
Join condition (id#910 + 1) AS id#912 is understood to become ambiguous as both sides of the prospect join contain id#910. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:

== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true

Does this PR introduce any user-facing change?

It fixes correctness.

How was this patch tested?

Unit tests in DataFrameJoinSuite and LeftSemiAntiJoinPushDownSuite.

@github-actions github-actions bot added the SQL label Jan 5, 2023
…tions

Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`).

This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard.

Without this change, the optimizer creates an incorrect plan.

This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical:
```scala
val ids = Seq(1, 2, 3).toDF("id").distinct()
val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect()
assert(result.length == 1)
```
With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join.

**Before this PR:**
The anti-join is fully removed from the plan.
```
== Physical Plan ==
AdaptiveSparkPlan (16)
+- == Final Plan ==
   LocalTableScan (1)

(16) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
!Join LeftAnti, (id#912 = id#910)                  Aggregate [id#910], [(id#910 + 1) AS id#912]
!:- Aggregate [id#910], [(id#910 + 1) AS id#912]   +- Project [value#907 AS id#910]
!:  +- Project [value#907 AS id#910]                  +- Join LeftAnti, ((value#907 + 1) = value#907)
!:     +- LocalRelation [value#907]                      :- LocalRelation [value#907]
!+- Aggregate [id#910], [id#910]                         +- Aggregate [id#910], [id#910]
!   +- Project [value#914 AS id#910]                        +- Project [value#914 AS id#910]
!      +- LocalRelation [value#914]                            +- LocalRelation [value#914]
```

The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition.

**After this PR:**
Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more.

The final plan contains the anti-join:
```
== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Final Plan ==
   * BroadcastHashJoin LeftSemi BuildRight (14)
   :- * HashAggregate (7)
   :  +- AQEShuffleRead (6)
   :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3)
   :        +- Exchange (4)
   :           +- * HashAggregate (3)
   :              +- * Project (2)
   :                 +- * LocalTableScan (1)
   +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3)
      +- BroadcastExchange (12)
         +- * HashAggregate (11)
            +- AQEShuffleRead (10)
               +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3)
                  +- ReusedExchange (8)

(8) ReusedExchange [Reuses operator id: 4]
Output [1]: [id#898]

(24) AdaptiveSparkPlan
Output [1]: [id#900]
Arguments: isFinalPlan=true
```

It fixes correctness.

Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`.

Closes apache#39131 from EnricoMi/branch-antijoin-selfjoin-fix.

Authored-by: Enrico Minack <github@enrico.minack.dev>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@EnricoMi EnricoMi force-pushed the branch-antijoin-selfjoin-fix-3.1 branch from 0da0ce4 to 5e62e3a Compare January 5, 2023 16:08
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making a PR for branch-3.1, but branch-3.1 has been EOL status since September 2022 according to the Apache Spark community versioning policy.

Feature release branches will, generally, be maintained with bug fix releases for a period of 18 months.

Inevitably, I close this PR on branch-3.1. @EnricoMi .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants