diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 62a75e753455..4bedbfb81deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -144,6 +144,9 @@ case class AdaptiveSparkPlanExec( @transient private val collapseCodegenStagesRule: Rule[SparkPlan] = CollapseCodegenStages() + private val isAQECachedDataFrameSupportEnabled = context.session.sessionState.conf.getConf( + SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING) + // A list of physical optimizer rules to be applied right after a new stage is created. The input // plan to these rules has exchange as its root node. private def postStageCreationRules(outputsColumnar: Boolean) = Seq( @@ -345,6 +348,16 @@ case class AdaptiveSparkPlanExec( if (!isSubquery && currentPhysicalPlan.exists(_.subqueries.nonEmpty)) { getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) } + + // Need to post final subPlan changes under InMemoryRelation(IMR) by supporting AQE under IMR + if (isAQECachedDataFrameSupportEnabled) { + val executionId = getExecutionId.orElse { + Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + .map(_.toLong) + } + executionId.foreach(onUpdatePlan(_, Seq.empty)) + } + logOnLevel(s"Final plan:\n$currentPhysicalPlan") }