diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 40d2e1a3a8f46..6c9c0e1cda4e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -659,7 +659,7 @@ case class AdaptiveSparkPlanExec( // node to prevent the loss of the `BroadcastExchangeExec` node in DPP subquery. // Here, we also need to avoid to insert the `BroadcastExchangeExec` node when the newPlan is // already the `BroadcastExchangeExec` plan after apply the `LogicalQueryStageStrategy` rule. - val finalPlan = currentPhysicalPlan match { + val finalPlan = inputPlan match { case b: BroadcastExchangeLike if (!newPlan.isInstanceOf[BroadcastExchangeLike]) => b.withNewChildren(Seq(newPlan)) case _ => newPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index cfdd2e08a79ea..d5498c469c541 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1694,6 +1694,25 @@ class DynamicPartitionPruningV1SuiteAEOff extends DynamicPartitionPruningV1Suite class DynamicPartitionPruningV1SuiteAEOn extends DynamicPartitionPruningV1Suite with EnableAdaptiveExecutionSuite { + test("SPARK-39447: Avoid AssertionError in AdaptiveSparkPlanExec.doExecuteBroadcast") { + val df = sql( + """ + |WITH empty_result AS ( + | SELECT * FROM fact_stats WHERE product_id < 0 + |) + |SELECT * + |FROM (SELECT /*+ SHUFFLE_MERGE(fact_sk) */ empty_result.store_id + | FROM fact_sk + | JOIN empty_result + | ON fact_sk.product_id = empty_result.product_id) t2 + | JOIN empty_result + | ON t2.store_id = empty_result.store_id + """.stripMargin) + + checkPartitionPruningPredicate(df, false, false) + checkAnswer(df, Nil) + } + test("SPARK-37995: PlanAdaptiveDynamicPruningFilters should use prepareExecutedPlan " + "rather than createSparkPlan to re-plan subquery") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",