Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1711,8 +1711,7 @@ def cache(self) -> "DataFrame":

>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan ...
InMemoryTableScan ...
"""
self.is_cached = True
self._jdf.cache()
Expand Down Expand Up @@ -1754,8 +1753,7 @@ def persist(

>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- InMemoryTableScan ...
InMemoryTableScan ...

Persists the data in the disk by specifying the storage level.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1561,11 +1561,11 @@ object SQLConf {
.doc("Whether to forcibly enable some optimization rules that can change the output " +
"partitioning of a cached query when executing it for caching. If it is set to true, " +
"queries may need an extra shuffle to read the cached data. This configuration is " +
"enabled by default. The optimization rules enabled by this configuration " +
s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.")
"disabled by default. The optimization rule enabled by this configuration " +
s"is ${ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS.key}.")
.version("3.2.0")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val DEFAULT_CACHE_STORAGE_LEVEL = buildConf("spark.sql.defaultCacheStorageLevel")
.doc("The default storage level of `dataset.cache()`, `catalog.cacheTable()` and " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import scala.collection.immutable.IndexedSeq
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
Expand Down Expand Up @@ -60,17 +59,6 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()

/**
* Configurations needs to be turned off, to avoid regression for cached query, so that the
* outputPartitioning of the underlying cached query plan can be leveraged later.
* Configurations include:
* 1. AQE
* 2. Automatic bucketed table scan
*/
private val forceDisableConfigs: Seq[ConfigEntry[Boolean]] = Seq(
SQLConf.ADAPTIVE_EXECUTION_ENABLED,
SQLConf.AUTO_BUCKETED_SCAN_ENABLED)

/** Clears all cached tables. */
def clearCache(): Unit = this.synchronized {
cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
Expand Down Expand Up @@ -395,19 +383,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {

/**
* If `CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING` is enabled, return the session with disabled
* `AUTO_BUCKETED_SCAN_ENABLED`.
* If `CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING` is disabled, return the session with disabled
* `AUTO_BUCKETED_SCAN_ENABLED` and `ADAPTIVE_EXECUTION_ENABLED`.
* `ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS`.
* `AUTO_BUCKETED_SCAN_ENABLED` is always disabled.
*/
private def getOrCloneSessionWithConfigsOff(session: SparkSession): SparkSession = {
if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
// Bucketed scan only has one time overhead but can have multi-times benefits in cache,
// so we always do bucketed scan in a cached plan.
SparkSession.getOrCloneSessionWithConfigsOff(session,
SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS ::
SQLConf.AUTO_BUCKETED_SCAN_ENABLED :: Nil)
} else {
SparkSession.getOrCloneSessionWithConfigsOff(session, forceDisableConfigs)
// Bucketed scan only has one time overhead but can have multi-times benefits in cache,
// so we always do bucketed scan in a cached plan.
var disableConfigs = Seq(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
if (!session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
// Allowing changing cached plan output partitioning might lead to regression as it introduces
// extra shuffle
disableConfigs =
disableConfigs :+ SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS
}
SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ case class InsertAdaptiveSparkPlan(
// - The query may need to add exchanges. It's an overkill to run `EnsureRequirements` here, so
// we just check `SparkPlan.requiredChildDistribution` and see if it's possible that the
// the query needs to add exchanges later.
// - The query contains nested `AdaptiveSparkPlanExec`.
// - The query contains `InMemoryTableScanExec`.
// - The query contains `InMemoryTableScanExec` that is AQE-ed.
// - The query contains sub-query.
private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
Expand All @@ -103,8 +102,6 @@ case class InsertAdaptiveSparkPlan(
// update correctly.
case i: InMemoryTableScanExec
if i.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true
case _: InMemoryTableScanExec
if conf.getConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING) => true
case p => p.expressions.exists(_.exists {
case _: SubqueryExpression => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan, SparkPlanInfo}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -1630,20 +1630,37 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {

var finalPlan = ""
var finalPlan: SparkPlanInfo = null
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case SparkListenerSQLAdaptiveExecutionUpdate(_, physicalPlanDesc, sparkPlanInfo) =>
case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) =>
if (sparkPlanInfo.simpleString.startsWith(
"AdaptiveSparkPlan isFinalPlan=true")) {
finalPlan = physicalPlanDesc
finalPlan = sparkPlanInfo
}
case _ => // ignore other events
}
}
}

def findNodeInSparkPlanInfo(root: SparkPlanInfo, cond: SparkPlanInfo => Boolean):
Option[SparkPlanInfo] = {
if (cond(root)) {
Some(root)
} else {
root.children.flatMap(findNodeInSparkPlanInfo(_, cond)).headOption
}
}

def cachedFinalStageCoalesced(sparkPlanInfo: SparkPlanInfo): Boolean = {
val inMemoryScanNode = findNodeInSparkPlanInfo(sparkPlanInfo,
_.nodeName.contains("TableCacheQueryStage"))
val aqeNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
_.nodeName.contains("AdaptiveSparkPlan"))
aqeNode.get.children.head.nodeName == "AQEShuffleRead"
}

withTempView("t0", "t1", "t2") {
try {
spark.range(10).write.saveAsTable("t0")
Expand All @@ -1655,16 +1672,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
"SELECT distinct (id+1) FROM t0)")
assert(spark.table("t1").rdd.partitions.length == 2)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(finalPlan.nonEmpty && !finalPlan.contains("coalesced"))
assert(finalPlan != null && !cachedFinalStageCoalesced(finalPlan))
}

finalPlan = "" // reset finalPlan
finalPlan = null // reset finalPlan
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
sql("CACHE TABLE t2 as SELECT /*+ REPARTITION */ * FROM (" +
"SELECT distinct (id-1) FROM t0)")
assert(spark.table("t2").rdd.partitions.length == 2)
assert(spark.table("t2").rdd.partitions.length == 1)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(finalPlan.nonEmpty && finalPlan.contains("coalesced"))
assert(finalPlan != null && cachedFinalStageCoalesced(finalPlan))
}
} finally {
spark.sparkContext.removeSparkListener(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2825,12 +2825,17 @@ class AdaptiveQueryExecSuite
}

test("SPARK-43026: Apply AQE with non-exchange table cache") {
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
val df = spark.range(0).cache()
df.collect()
assert(df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
assert(df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec]
.executedPlan.isInstanceOf[LocalTableScanExec])
Seq(true, false).foreach { canChangeOP =>
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> canChangeOP.toString) {
// No exchange, no need for AQE
val df = spark.range(0).cache()
df.collect()
assert(!df.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
// Has exchange, apply AQE
val df2 = spark.range(0).repartition(1).cache()
df2.collect()
assert(df2.queryExecution.executedPlan.isInstanceOf[AdaptiveSparkPlanExec])
}
}
}

Expand Down