Skip to content

Commit becc04a

Browse files
liuzqtcloud-fan
authored andcommitted
[SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan
### What changes were proposed in this pull request? #43435 and #43760 are fixing a correctness issue which will be triggered when AQE applied on cached query plan, specifically, when AQE coalescing the final result stage of the cached plan. The current semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` ([source code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411)): when true, we enable AQE, but disable coalescing final stage (default) when false, we disable AQE But let’s revisit the semantic of this config: actually for caller the only thing that matters is whether we change the output partitioning of the cached plan. And we should only try to apply AQE if possible. Thus we want to modify the semantic of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning when true, we enable AQE and allow coalescing final: this might lead to perf regression, because it introduce extra shuffle when false, we enable AQE, but disable coalescing final stage. (this is actually the `true` semantic of old behavior) Also, to keep the default behavior unchanged, we might want to flip the default value of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to `false` ### Why are the changes needed? To allow AQE coalesce final stage in SQL cached plan. Also make the semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` more reasonable. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Updated UTs. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45054 from liuzqt/SPARK-46995. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent dc73a8d commit becc04a

File tree

6 files changed

+53
-48
lines changed

6 files changed

+53
-48
lines changed

python/pyspark/sql/dataframe.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,8 +1711,7 @@ def cache(self) -> "DataFrame":
17111711
17121712
>>> df.explain()
17131713
== Physical Plan ==
1714-
AdaptiveSparkPlan isFinalPlan=false
1715-
+- InMemoryTableScan ...
1714+
InMemoryTableScan ...
17161715
"""
17171716
self.is_cached = True
17181717
self._jdf.cache()
@@ -1754,8 +1753,7 @@ def persist(
17541753
17551754
>>> df.explain()
17561755
== Physical Plan ==
1757-
AdaptiveSparkPlan isFinalPlan=false
1758-
+- InMemoryTableScan ...
1756+
InMemoryTableScan ...
17591757
17601758
Persists the data in the disk by specifying the storage level.
17611759

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,11 +1561,11 @@ object SQLConf {
15611561
.doc("Whether to forcibly enable some optimization rules that can change the output " +
15621562
"partitioning of a cached query when executing it for caching. If it is set to true, " +
15631563
"queries may need an extra shuffle to read the cached data. This configuration is " +
1564-
"enabled by default. The optimization rules enabled by this configuration " +
1565-
s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.")
1564+
"disabled by default. The optimization rule enabled by this configuration " +
1565+
s"is ${ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS.key}.")
15661566
.version("3.2.0")
15671567
.booleanConf
1568-
.createWithDefault(true)
1568+
.createWithDefault(false)
15691569

15701570
val DEFAULT_CACHE_STORAGE_LEVEL = buildConf("spark.sql.defaultCacheStorageLevel")
15711571
.doc("The default storage level of `dataset.cache()`, `catalog.cacheTable()` and " +

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.collection.immutable.IndexedSeq
2222
import org.apache.hadoop.fs.{FileSystem, Path}
2323

2424
import org.apache.spark.internal.Logging
25-
import org.apache.spark.internal.config.ConfigEntry
2625
import org.apache.spark.sql.{Dataset, SparkSession}
2726
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2827
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
@@ -60,17 +59,6 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
6059
@transient @volatile
6160
private var cachedData = IndexedSeq[CachedData]()
6261

63-
/**
64-
* Configurations needs to be turned off, to avoid regression for cached query, so that the
65-
* outputPartitioning of the underlying cached query plan can be leveraged later.
66-
* Configurations include:
67-
* 1. AQE
68-
* 2. Automatic bucketed table scan
69-
*/
70-
private val forceDisableConfigs: Seq[ConfigEntry[Boolean]] = Seq(
71-
SQLConf.ADAPTIVE_EXECUTION_ENABLED,
72-
SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
73-
7462
/** Clears all cached tables. */
7563
def clearCache(): Unit = this.synchronized {
7664
cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
@@ -395,19 +383,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
395383

396384
/**
397385
* If `CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING` is enabled, return the session with disabled
398-
* `AUTO_BUCKETED_SCAN_ENABLED`.
399-
* If `CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING` is disabled, return the session with disabled
400-
* `AUTO_BUCKETED_SCAN_ENABLED` and `ADAPTIVE_EXECUTION_ENABLED`.
386+
* `ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS`.
387+
* `AUTO_BUCKETED_SCAN_ENABLED` is always disabled.
401388
*/
402389
private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = {
403-
if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
404-
// Bucketed scan only has one time overhead but can have multi-times benefits in cache,
405-
// so we always do bucketed scan in a cached plan.
406-
SparkSession.getOrCloneSessionWithConfigsOff(session,
407-
SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS ::
408-
SQLConf.AUTO_BUCKETED_SCAN_ENABLED :: Nil)
409-
} else {
410-
SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs)
390+
// Bucketed scan only has one time overhead but can have multi-times benefits in cache,
391+
// so we always do bucketed scan in a cached plan.
392+
var disableConfigs = Seq(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
393+
if (!session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
394+
// Allowing changing cached plan output partitioning might lead to regression as it introduces
395+
// extra shuffle
396+
disableConfigs =
397+
disableConfigs :+ SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS
411398
}
399+
SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs)
412400
}
413401
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ case class InsertAdaptiveSparkPlan(
8989
// - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so
9090
// we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the
9191
// the query needs to add exchanges later.
92-
// - The query contains nested `AdaptiveSparkPlanExec`.
93-
// - The query contains `InMemoryTableScanExec`.
92+
// - The query contains `InMemoryTableScanExec` that is AQE-ed.
9493
// - The query contains sub-query.
9594
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
9695
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
@@ -103,8 +102,6 @@ case class InsertAdaptiveSparkPlan(
103102
// update correctly.
104103
case i: InMemoryTableScanExec
105104
if i.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true
106-
case _: InMemoryTableScanExec
107-
if conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING) => true
108105
case p => p.expressions.exists(_.exists {
109106
case _: SubqueryExpression => true
110107
case _ => false

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
3535
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
3636
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
3737
import org.apache.spark.sql.catalyst.util.DateTimeConstants
38-
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan}
38+
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan, SparkPlanInfo}
3939
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation}
4040
import org.apache.spark.sql.execution.columnar._
4141
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -1630,20 +1630,37 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
16301630
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
16311631
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
16321632

1633-
var finalPlan = ""
1633+
var finalPlan: SparkPlanInfo = null
16341634
val listener = new SparkListener {
16351635
override def onOtherEvent(event: SparkListenerEvent): Unit = {
16361636
event match {
1637-
case SparkListenerSQLAdaptiveExecutionUpdate(_, physicalPlanDesc, sparkPlanInfo) =>
1637+
case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) =>
16381638
if (sparkPlanInfo.simpleString.startsWith(
16391639
"AdaptiveSparkPlan isFinalPlan=true")) {
1640-
finalPlan = physicalPlanDesc
1640+
finalPlan = sparkPlanInfo
16411641
}
16421642
case _ => // ignore other events
16431643
}
16441644
}
16451645
}
16461646

1647+
def findNodeInSparkPlanInfo(root: SparkPlanInfo, cond: SparkPlanInfo => Boolean):
1648+
Option[SparkPlanInfo] = {
1649+
if (cond(root)) {
1650+
Some(root)
1651+
} else {
1652+
root.children.flatMap(findNodeInSparkPlanInfo(_, cond)).headOption
1653+
}
1654+
}
1655+
1656+
def cachedFinalStageCoalesced(sparkPlanInfo: SparkPlanInfo): Boolean = {
1657+
val inMemoryScanNode = findNodeInSparkPlanInfo(sparkPlanInfo,
1658+
_.nodeName.contains("TableCacheQueryStage"))
1659+
val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
1660+
_.nodeName.contains("AdaptiveSparkPlan"))
1661+
aqeNode.get.children.head.nodeName == "AQEShuffleRead"
1662+
}
1663+
16471664
withTempView("t0", "t1", "t2") {
16481665
try {
16491666
spark.range(10).write.saveAsTable("t0")
@@ -1655,16 +1672,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
16551672
"SELECT distinct (id+1) FROM t0)")
16561673
assert(spark.table("t1").rdd.partitions.length == 2)
16571674
spark.sparkContext.listenerBus.waitUntilEmpty()
1658-
assert(finalPlan.nonEmpty && !finalPlan.contains("coalesced"))
1675+
assert(finalPlan != null && !cachedFinalStageCoalesced(finalPlan))
16591676
}
16601677

1661-
finalPlan = "" // reset finalPlan
1678+
finalPlan = null // reset finalPlan
16621679
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
16631680
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM (" +
16641681
"SELECT distinct (id-1) FROM t0)")
1665-
assert(spark.table("t2").rdd.partitions.length == 2)
1682+
assert(spark.table("t2").rdd.partitions.length == 1)
16661683
spark.sparkContext.listenerBus.waitUntilEmpty()
1667-
assert(finalPlan.nonEmpty && finalPlan.contains("coalesced"))
1684+
assert(finalPlan != null && cachedFinalStageCoalesced(finalPlan))
16681685
}
16691686
} finally {
16701687
spark.sparkContext.removeSparkListener(listener)

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2825,12 +2825,17 @@ class AdaptiveQueryExecSuite
28252825
}
28262826

28272827
test("SPARK-43026: Apply AQE with non-exchange table cache") {
2828-
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
2829-
val df = spark.range(0).cache()
2830-
df.collect()
2831-
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
2832-
assert(df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
2833-
.executedPlan.isInstanceOf[LocalTableScanExec])
2828+
Seq(true, false).foreach { canChangeOP =>
2829+
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> canChangeOP.toString) {
2830+
// No exchange, no need for AQE
2831+
val df = spark.range(0).cache()
2832+
df.collect()
2833+
assert(!df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
2834+
// Has exchange, apply AQE
2835+
val df2 = spark.range(0).repartition(1).cache()
2836+
df2.collect()
2837+
assert(df2.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
2838+
}
28342839
}
28352840
}
28362841

0 commit comments

Comments
 (0)