diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8d2b4d041aa0..4402ba113076 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1711,8 +1711,7 @@ def cache(self) -> "DataFrame": >>> df.explain() == Physical Plan == - AdaptiveSparkPlan isFinalPlan=false - +- InMemoryTableScan ... + InMemoryTableScan ... """ self.is_cached = True self._jdf.cache() @@ -1754,8 +1753,7 @@ def persist( >>> df.explain() == Physical Plan == - AdaptiveSparkPlan isFinalPlan=false - +- InMemoryTableScan ... + InMemoryTableScan ... Persists the data in the disk by specifying the storage level. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1bff0ff1a350..8f86a1c8a1f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1561,11 +1561,11 @@ object SQLConf { .doc("Whether to forcibly enable some optimization rules that can change the output " + "partitioning of a cached query when executing it for caching. If it is set to true, " + "queries may need an extra shuffle to read the cached data. This configuration is " + - "enabled by default. The optimization rules enabled by this configuration " + - s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.") + "disabled by default. The optimization rule enabled by this configuration " + + s"is ${ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS.key}.") .version("3.2.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val DEFAULT_CACHE_STORAGE_LEVEL = buildConf("spark.sql.defaultCacheStorageLevel") .doc("The default storage level of `dataset.cache()`, `catalog.cacheTable()` and " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 093599af1222..db6266fe1756 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,7 +22,6 @@ import scala.collection.immutable.IndexedSeq import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} @@ -60,17 +59,6 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { @transient @volatile private var cachedData = IndexedSeq[CachedData]() - /** - * Configurations needs to be turned off, to avoid regression for cached query, so that the - * outputPartitioning of the underlying cached query plan can be leveraged later. - * Configurations include: - * 1. AQE - * 2. Automatic bucketed table scan - */ - private val forceDisableConfigs: Seq[ConfigEntry[Boolean]] = Seq( - SQLConf.ADAPTIVE_EXECUTION_ENABLED, - SQLConf.AUTO_BUCKETED_SCAN_ENABLED) - /** Clears all cached tables. */ def clearCache(): Unit = this.synchronized { cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) @@ -395,19 +383,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { /** * If `CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING` is enabled, return the session with disabled - * `AUTO_BUCKETED_SCAN_ENABLED`. - * If `CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING` is disabled, return the session with disabled - * `AUTO_BUCKETED_SCAN_ENABLED` and `ADAPTIVE_EXECUTION_ENABLED`. + * `ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS`. + * `AUTO_BUCKETED_SCAN_ENABLED` is always disabled. */ private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = { - if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { - // Bucketed scan only has one time overhead but can have multi-times benefits in cache, - // so we always do bucketed scan in a cached plan. - SparkSession.getOrCloneSessionWithConfigsOff(session, - SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS :: - SQLConf.AUTO_BUCKETED_SCAN_ENABLED :: Nil) - } else { - SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs) + // Bucketed scan only has one time overhead but can have multi-times benefits in cache, + // so we always do bucketed scan in a cached plan. + var disableConfigs = Seq(SQLConf.AUTO_BUCKETED_SCAN_ENABLED) + if (!session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) { + // Allowing changing cached plan output partitioning might lead to regression as it introduces + // extra shuffle + disableConfigs = + disableConfigs :+ SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS } + SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 9986e5d47870..50f2b7c81453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -89,8 +89,7 @@ case class InsertAdaptiveSparkPlan( // - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so // we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the // the query needs to add exchanges later. - // - The query contains nested `AdaptiveSparkPlanExec`. - // - The query contains `InMemoryTableScanExec`. + // - The query contains `InMemoryTableScanExec` that is AQE-ed. // - The query contains sub-query. private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = { conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || { @@ -103,8 +102,6 @@ case class InsertAdaptiveSparkPlan( // update correctly. case i: InMemoryTableScanExec if i.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true - case _: InMemoryTableScanExec - if conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING) => true case p => p.expressions.exists(_.exists { case _: SubqueryExpression => true case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index eff7e9ad23a3..7865e7f1f864 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH} import org.apache.spark.sql.catalyst.util.DateTimeConstants -import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan, SparkPlanInfo} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation} import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -1630,20 +1630,37 @@ class CachedTableSuite extends QueryTest with SQLTestUtils SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { - var finalPlan = "" + var finalPlan: SparkPlanInfo = null val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { - case SparkListenerSQLAdaptiveExecutionUpdate(_, physicalPlanDesc, sparkPlanInfo) => + case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) => if (sparkPlanInfo.simpleString.startsWith( "AdaptiveSparkPlan isFinalPlan=true")) { - finalPlan = physicalPlanDesc + finalPlan = sparkPlanInfo } case _ => // ignore other events } } } + def findNodeInSparkPlanInfo(root: SparkPlanInfo, cond: SparkPlanInfo => Boolean): + Option[SparkPlanInfo] = { + if (cond(root)) { + Some(root) + } else { + root.children.flatMap(findNodeInSparkPlanInfo(_, cond)).headOption + } + } + + def cachedFinalStageCoalesced(sparkPlanInfo: SparkPlanInfo): Boolean = { + val inMemoryScanNode = findNodeInSparkPlanInfo(sparkPlanInfo, + _.nodeName.contains("TableCacheQueryStage")) + val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get, + _.nodeName.contains("AdaptiveSparkPlan")) + aqeNode.get.children.head.nodeName == "AQEShuffleRead" + } + withTempView("t0", "t1", "t2") { try { spark.range(10).write.saveAsTable("t0") @@ -1655,16 +1672,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils "SELECT distinct (id+1) FROM t0)") assert(spark.table("t1").rdd.partitions.length == 2) spark.sparkContext.listenerBus.waitUntilEmpty() - assert(finalPlan.nonEmpty && !finalPlan.contains("coalesced")) + assert(finalPlan != null && !cachedFinalStageCoalesced(finalPlan)) } - finalPlan = "" // reset finalPlan + finalPlan = null // reset finalPlan withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM (" + "SELECT distinct (id-1) FROM t0)") - assert(spark.table("t2").rdd.partitions.length == 2) + assert(spark.table("t2").rdd.partitions.length == 1) spark.sparkContext.listenerBus.waitUntilEmpty() - assert(finalPlan.nonEmpty && finalPlan.contains("coalesced")) + assert(finalPlan != null && cachedFinalStageCoalesced(finalPlan)) } } finally { spark.sparkContext.removeSparkListener(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index a67b1b69f6be..d1d83f96c670 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2825,12 +2825,17 @@ class AdaptiveQueryExecSuite } test("SPARK-43026: Apply AQE with non-exchange table cache") { - withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") { - val df = spark.range(0).cache() - df.collect() - assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) - assert(df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] - .executedPlan.isInstanceOf[LocalTableScanExec]) + Seq(true, false).foreach { canChangeOP => + withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> canChangeOP.toString) { + // No exchange, no need for AQE + val df = spark.range(0).cache() + df.collect() + assert(!df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) + // Has exchange, apply AQE + val df2 = spark.range(0).repartition(1).cache() + df2.collect() + assert(df2.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec]) + } } }