Skip to content

Commit 6468f96

Browse files
Emil EjbyfeldtHyukjinKwon
authored andcommitted
[SPARK-45386][SQL][3.5] Fix correctness issue with persist using StorageLevel.NONE on Dataset
### What changes were proposed in this pull request? Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like: ``` import org.apache.spark.storage.StorageLevel spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count() ``` would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here: https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294 that the input is empty. The problem is that the action that should make sure the statistics are collected here https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291 never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics. The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances. ### Why are the changes needed? The current code has a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes the correctness issue. ### How was this patch tested? New and existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5. Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 522af69 commit 6468f96

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
113113
planToCache: LogicalPlan,
114114
tableName: Option[String],
115115
storageLevel: StorageLevel): Unit = {
116-
if (lookupCachedData(planToCache).nonEmpty) {
116+
if (storageLevel == StorageLevel.NONE) {
117+
// Do nothing for StorageLevel.NONE since it will not actually cache any data.
118+
} else if (lookupCachedData(planToCache).nonEmpty) {
117119
logWarning("Asked to cache already cached data.")
118120
} else {
119121
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import org.apache.spark.sql.functions._
4545
import org.apache.spark.sql.internal.SQLConf
4646
import org.apache.spark.sql.test.SharedSparkSession
4747
import org.apache.spark.sql.types._
48+
import org.apache.spark.storage.StorageLevel
4849

4950
case class TestDataPoint(x: Int, y: Double, s: String, t: TestDataPoint2)
5051
case class TestDataPoint2(x: Int, s: String)
@@ -2535,6 +2536,11 @@ class DatasetSuite extends QueryTest
25352536

25362537
checkDataset(ds.filter(f(col("_1"))), Tuple1(ValueClass(2)))
25372538
}
2539+
2540+
test("SPARK-45386: persist with StorageLevel.NONE should give correct count") {
2541+
val ds = Seq(1, 2).toDS().persist(StorageLevel.NONE)
2542+
assert(ds.count() == 2)
2543+
}
25382544
}
25392545

25402546
class DatasetLargeResultCollectingSuite extends QueryTest

0 commit comments

Comments
 (0)