Skip to content

Conversation

@ulysses-you
Copy link
Contributor

What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to PartitioningCollection so it may not satisfy the target required clustering.

Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)

Does this PR introduce any user-facing change?

yes, it's a bug fix

How was this patch tested?

add test

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Aug 9, 2024
@ulysses-you
Copy link
Contributor Author

cc @cloud-fan @yaooqinn thank you

@cloud-fan
Copy link
Contributor

Can you get review from the active contributors of SPJ?

@ulysses-you
Copy link
Contributor Author

cc @huaxingao @szehon-ho do you have time to take a look ? thank you

@ulysses-you
Copy link
Contributor Author

also cc @sunchao @viirya thank you

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.

Comment on lines 383 to 389
val df = sql(
"""
|SELECT * FROM testcat.ns.t1
|JOIN testcat.ns.t2 ON t1.id = t2.id
|JOIN testcat.ns.t3 ON t1.id = t3.id
|""".stripMargin)
assert(collectShuffles(df.queryExecution.executedPlan).isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also check the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added checkAnswer

ulysses-you added a commit that referenced this pull request Aug 13, 2024
…rror

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47683 from ulysses-you/SPARK-49179.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
(cherry picked from commit 8133294)
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
ulysses-you added a commit that referenced this pull request Aug 13, 2024
…rror

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47683 from ulysses-you/SPARK-49179.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
(cherry picked from commit 8133294)
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
@ulysses-you
Copy link
Contributor Author

thank you all, merged to master/branch-3.5/branch-3.4

@ulysses-you ulysses-you deleted the SPARK-49179 branch August 13, 2024 05:10
@dongjoon-hyun
Copy link
Member

@ulysses-you
Copy link
Contributor Author

thank you @dongjoon-hyun , will send pr for each branch later

@dongjoon-hyun
Copy link
Member

Thanks. Sure, take your time.

For now, branch-3.5/3.4 are recovered via reverting.

ulysses-you added a commit to ulysses-you/spark that referenced this pull request Aug 13, 2024
…rror

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47683 from ulysses-you/SPARK-49179.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
(cherry picked from commit 8133294)
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
ulysses-you added a commit to ulysses-you/spark that referenced this pull request Aug 13, 2024
…rror

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47683 from ulysses-you/SPARK-49179.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
(cherry picked from commit 8133294)
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
ulysses-you added a commit that referenced this pull request Aug 13, 2024
…tionError

backport #47683 to branch-3.4

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47736 from ulysses-you/SPARK-49179-3.4.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
ulysses-you added a commit that referenced this pull request Aug 13, 2024
…tionError

backport #47683 to branch-3.5

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47735 from ulysses-you/SPARK-49179-3.5.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…tionError (apache#544)

backport apache#47683 to branch-3.5

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47735 from ulysses-you/SPARK-49179-3.5.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>

Signed-off-by: youxiduo <youxiduo@corp.netease.com>
Co-authored-by: ulysses-you <ulyssesyou18@gmail.com>
turboFei pushed a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…rror

### What changes were proposed in this pull request?

For SMJ with inner join, it just wraps left and right output partitioning to `PartitioningCollection` so it may not satisfy the target required clustering.

### Why are the changes needed?

Fix exception if the query contains multi bucketed inner joins

```sql
SELECT * FROM testcat.ns.t1
JOIN testcat.ns.t2 ON t1.id = t2.id
JOIN testcat.ns.t3 ON t1.id = t3.id
```

```
Cause: java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:264)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.createKeyGroupedShuffleSpec(EnsureRequirements.scala:642)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.$anonfun$checkKeyGroupCompatible$1(EnsureRequirements.scala:385)
at scala.collection.immutable.List.map(List.scala:247)
at scala.collection.immutable.List.map(List.scala:79)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:382)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.checkKeyGroupCompatible(EnsureRequirements.scala:364)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$ensureDistributionAndOrdering(EnsureRequirements.scala:166)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:714)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$1.applyOrElse(EnsureRequirements.scala:689)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$4(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:84)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:528)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:497)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:689)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:51)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$.$anonfun$applyPhysicalRules$2(AdaptiveSparkPlanExec.scala:882)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#47683 from ulysses-you/SPARK-49179.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: youxiduo <youxiduo@corp.netease.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants