Skip to content

Commit cd2f30c

Browse files
author
Branden Smith
committed
Merge branch 'master' of github.com:apache/spark into json_emptyline_count
2 parents 3cae4da + ce7e7df commit cd2f30c

File tree

3 files changed

+67
-15
lines changed

3 files changed

+67
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,26 @@ class CacheManager extends Logging {
180180
val it = cachedData.iterator()
181181
while (it.hasNext) {
182182
val cd = it.next()
183-
if (condition(cd.plan)) {
183+
// If `clearCache` is false (which means the recache request comes from a non-cascading
184+
// cache invalidation) and the cache buffer has already been loaded, we do not need to
185+
// re-compile a physical plan because the old plan will not be used any more by the
186+
// CacheManager although it still lives in compiled `Dataset`s and it could still work.
187+
// Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
188+
// and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
189+
// buffer is still empty, then we could have a more efficient new plan by removing
190+
// dependency on the previously removed cache entries.
191+
// Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
192+
// status test and may not return the most accurate cache buffer state. So the worse case
193+
// scenario can be:
194+
// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
195+
// will clear the buffer and build a new plan. It is inefficient but doesn't affect
196+
// correctness.
197+
// 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
198+
// will keep it as it is. It means the physical plan has been re-compiled already in the
199+
// other thread.
200+
val buildNewPlan =
201+
clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
202+
if (condition(cd.plan) && buildNewPlan) {
184203
needToRecache += cd
185204
// Remove the cache entry before we create a new one, so that we can have a different
186205
// physical plan.
@@ -189,12 +208,11 @@ class CacheManager extends Logging {
189208
}
190209
}
191210
needToRecache.map { cd =>
192-
if (clearCache) {
193-
cd.cachedRepresentation.cacheBuilder.clearCache()
194-
}
211+
cd.cachedRepresentation.cacheBuilder.clearCache()
195212
val plan = spark.sessionState.executePlan(cd.plan).executedPlan
196213
val newCache = InMemoryRelation(
197-
cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
214+
cacheBuilder = cd.cachedRepresentation
215+
.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
198216
logicalPlan = cd.plan)
199217
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
200218
writeLock {

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,8 @@ case class CachedRDDBuilder(
7575
}
7676
}
7777

78-
def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
79-
new CachedRDDBuilder(
80-
useCompression,
81-
batchSize,
82-
storageLevel,
83-
cachedPlan = cachedPlan,
84-
tableName
85-
)(_cachedColumnBuffers)
78+
def isCachedColumnBuffersLoaded: Boolean = {
79+
_cachedColumnBuffers != null
8680
}
8781

8882
private def buildBuffers(): RDD[CachedBatch] = {

sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
190190

191191
df1.unpersist(blocking = true)
192192

193-
// df1 un-cached; df2's cache plan re-compiled
193+
// df1 un-cached; df2's cache plan stays the same
194194
assert(df1.storageLevel == StorageLevel.NONE)
195-
assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
195+
assertCacheDependency(df1.groupBy('a).agg(sum('b)))
196196

197197
val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
198198
assertCached(df4)
@@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
206206
// first time use, load cache
207207
checkDataset(df5, Row(10))
208208
}
209+
210+
test("SPARK-26708 Cache data and cached plan should stay consistent") {
211+
val df = spark.range(0, 5).toDF("a")
212+
val df1 = df.withColumn("b", 'a + 1)
213+
val df2 = df.filter('a > 1)
214+
215+
df.cache()
216+
// Add df1 to the CacheManager; the buffer is currently empty.
217+
df1.cache()
218+
// After calling collect(), df1's buffer has been loaded.
219+
df1.collect()
220+
// Add df2 to the CacheManager; the buffer is currently empty.
221+
df2.cache()
222+
223+
// Verify that df1 is a InMemoryRelation plan with dependency on another cached plan.
224+
assertCacheDependency(df1)
225+
val df1InnerPlan = df1.queryExecution.withCachedData
226+
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
227+
// Verify that df2 is a InMemoryRelation plan with dependency on another cached plan.
228+
assertCacheDependency(df2)
229+
230+
df.unpersist(blocking = true)
231+
232+
// Verify that df1's cache has stayed the same, since df1's cache already has data
233+
// before df.unpersist().
234+
val df1Limit = df1.limit(2)
235+
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
236+
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
237+
}
238+
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
239+
240+
// Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
241+
// on df, since df2's cache had not been loaded before df.unpersist().
242+
val df2Limit = df2.limit(2)
243+
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
244+
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
245+
}
246+
assert(df2LimitInnerPlan.isDefined &&
247+
df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
248+
}
209249
}

0 commit comments

Comments
 (0)