Skip to content

Commit

Permalink
[SPARK-39447][SQL] Avoid AssertionError in AdaptiveSparkPlanExec.doEx…
Browse files Browse the repository at this point in the history
…ecuteBroadcast

Change `currentPhysicalPlan` to `inputPlan ` when we restore the broadcast exchange for DPP.

The currentPhysicalPlan can be wrapped with broadcast query stage so it is not safe to match it. For example:
 The broadcast exchange which is added by DPP is running before than the normal broadcast exchange(e.g. introduced by join).

yes bug fix

add test

Closes apache#36974 from ulysses-you/inputplan.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
ulysses-you authored and lwz9103 committed May 8, 2024
1 parent 6bb3121 commit 5756947
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -670,18 +670,18 @@ case class AdaptiveSparkPlanExec(
preprocessingRules ++ queryStagePreparationRules,
Some((planChangeLogger, "AQE Replanning")))

// When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
// add the `BroadcastExchangeExec` node manually in the DPP subquery,
// not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
// and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec`
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan
// is already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
val finalPlan = currentPhysicalPlan match {
case b: BroadcastExchangeLike
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
case _ => newPlan
}
// When both enabling AQE and DPP, `PlanAdaptiveDynamicPruningFilters` rule will
// add the `BroadcastExchangeExec` node manually in the DPP subquery,
// not through `EnsureRequirements` rule. Therefore, when the DPP subquery is complicated
// and need to be re-optimized, AQE also need to manually insert the `BroadcastExchangeExec`
// node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery.
// Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is
// already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule.
val finalPlan = inputPlan match {
case b: BroadcastExchangeLike
if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan))
case _ => newPlan
}

(finalPlan, optimized)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite
class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite
with EnableAdaptiveExecutionSuite {

test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") {
val df = sql(
"""
|WITH empty_result AS (
| SELECT * FROM fact_stats WHERE product_id < 0
|)
|SELECT *
|FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id
| FROM fact_sk
| JOIN empty_result
| ON fact_sk.product_id = empty_result.product_id) t2
| JOIN empty_result
| ON t2.store_id = empty_result.store_id
""".stripMargin)

checkPartitionPruningPredicate(df, false, false)
checkAnswer(df, Nil)
}

test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " +
"rather than createSparkPlan to re-plan subquery") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
Expand Down

0 comments on commit 5756947

Please sign in to comment.