Skip to content

Commit 4e20a2a

Browse files
riyaverm-dbHeartSaVioR
authored andcommitted
[SPARK-48931][SS] Reduce Cloud Store List API cost for state store maintenance task
### What changes were proposed in this pull request? Currently, during the state store maintenance process, we find which old version files of the **RocksDB** state store to delete by listing all existing snapshotted version files in the checkpoint directory every 1 minute by default. The frequent list calls in the cloud can result in high costs. To address this concern and reduce the cost associated with state store maintenance, we should aim to minimize the frequency of listing object stores inside the maintenance task. To minimize the frequency, we will try to accumulate versions to delete and only call list when the number of versions to delete reaches a configured threshold. The changes include: 1. Adding new configuration variable `ratioExtraVersionsAllowedInCheckpoint` in **SQLConf**. This determines the ratio of extra versions files we want to retain in the checkpoint directory compared to number of versions to retain for rollbacks (`minBatchesToRetain`). 2. Using this config and `minBatchesToRetain`, set `minVersionsToDelete` config inside **StateStoreConf** to `minVersionsToDelete = ratioExtraVersionsAllowedInCheckpoint * minBatchesToRetain.` 3. Using `minSeenVersion` and `maxSeenVersion` variables in **RocksDBFileManager** to estimate min/max version present in directory and control deletion frequency. This is done by ensuring number of stale versions to delete is at least `minVersionsToDelete` ### Why are the changes needed? Currently, maintenance operations like snapshotting, purging, deletion, and file management is done asynchronously for each data partition. We want to shift away periodic deletion and instead rely on the estimated number of files in the checkpoint directory to reduce list calls and introduce batch deletion. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47393 from riyaverm-db/reduce-cloud-store-list-api-cost-in-maintenance. Authored-by: Riya Verma <riya.verma@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent f828146 commit 4e20a2a

File tree

6 files changed

+143
-6
lines changed

6 files changed

+143
-6
lines changed

common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ private[spark] object LogKeys {
396396
case object MAX_NUM_PARTITIONS extends LogKey
397397
case object MAX_NUM_POSSIBLE_BINS extends LogKey
398398
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
399+
case object MAX_SEEN_VERSION extends LogKey
399400
case object MAX_SERVICE_NAME_LENGTH extends LogKey
400401
case object MAX_SIZE extends LogKey
401402
case object MAX_SLOTS extends LogKey
@@ -420,9 +421,11 @@ private[spark] object LogKeys {
420421
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
421422
case object MIN_POINT_PER_CLUSTER extends LogKey
422423
case object MIN_RATE extends LogKey
424+
case object MIN_SEEN_VERSION extends LogKey
423425
case object MIN_SHARE extends LogKey
424426
case object MIN_SIZE extends LogKey
425427
case object MIN_TIME extends LogKey
428+
case object MIN_VERSIONS_TO_DELETE extends LogKey
426429
case object MIN_VERSION_NUM extends LogKey
427430
case object MISSING_PARENT_STAGES extends LogKey
428431
case object MODEL_WEIGHTS extends LogKey
@@ -850,6 +853,7 @@ private[spark] object LogKeys {
850853
case object USER_NAME extends LogKey
851854
case object UUID extends LogKey
852855
case object VALUE extends LogKey
856+
case object VERSIONS_TO_DELETE extends LogKey
853857
case object VERSION_NUM extends LogKey
854858
case object VIEW_ACLS extends LogKey
855859
case object VIEW_ACLS_GROUPS extends LogKey

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,6 +2129,17 @@ object SQLConf {
21292129
.intConf
21302130
.createWithDefault(100)
21312131

2132+
val RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT =
2133+
buildConf("spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint")
2134+
.internal()
2135+
.doc("The ratio of extra space allowed for batch deletion of files when maintenance is" +
2136+
"invoked. When value > 0, it optimizes the cost of discovering and deleting old checkpoint " +
2137+
"versions. The minimum number of stale versions we retain in checkpoint location for batch " +
2138+
"deletion is calculated by minBatchesToRetain * ratioExtraSpaceAllowedInCheckpoint.")
2139+
.version("4.0.0")
2140+
.doubleConf
2141+
.createWithDefault(0.3)
2142+
21322143
val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
21332144
.internal()
21342145
.doc("The maximum number of batches which will be retained in memory to avoid " +
@@ -5405,6 +5416,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
54055416

54065417
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
54075418

5419+
def ratioExtraSpaceAllowedInCheckpoint: Double = getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)
5420+
54085421
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
54095422

54105423
def streamingMaintenanceInterval: Long = getConf(STREAMING_MAINTENANCE_INTERVAL)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ class RocksDB(
185185
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
186186
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
187187
loadedVersion = latestSnapshotVersion
188+
// Initialize maxVersion upon successful load from DFS
189+
fileManager.setMaxSeenVersion(version)
188190

189191
// reset last snapshot version
190192
if (lastSnapshotVersion > latestSnapshotVersion) {
@@ -554,6 +556,11 @@ class RocksDB(
554556
}
555557
}
556558

559+
// Set maxVersion when checkpoint files are synced to DFS successfully
560+
// We need to handle this explicitly in RocksDB as we could use different
561+
// changeLogWriter instances in fileManager instance when committing
562+
fileManager.setMaxSeenVersion(newVersion)
563+
557564
numKeysOnLoadedVersion = numKeysOnWritingVersion
558565
loadedVersion = newVersion
559566
commitLatencyMs ++= Map(
@@ -640,7 +647,7 @@ class RocksDB(
640647
uploadSnapshot()
641648
}
642649
val cleanupTime = timeTakenMs {
643-
fileManager.deleteOldVersions(conf.minVersionsToRetain)
650+
fileManager.deleteOldVersions(conf.minVersionsToRetain, conf.minVersionsToDelete)
644651
}
645652
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms")
646653
}
@@ -896,6 +903,7 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
896903
*/
897904
case class RocksDBConf(
898905
minVersionsToRetain: Int,
906+
minVersionsToDelete: Long,
899907
minDeltasForSnapshot: Int,
900908
compactOnCommit: Boolean,
901909
enableChangelogCheckpointing: Boolean,
@@ -1078,6 +1086,7 @@ object RocksDBConf {
10781086

10791087
RocksDBConf(
10801088
storeConf.minVersionsToRetain,
1089+
storeConf.minVersionsToDelete,
10811090
storeConf.minDeltasForSnapshot,
10821091
getBooleanConf(COMPACT_ON_COMMIT_CONF),
10831092
getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ class RocksDBFileManager(
146146

147147
private def codec = CompressionCodec.createCodec(sparkConf, codecName)
148148

149+
private var maxSeenVersion: Option[Long] = None
150+
private var minSeenVersion = 1L
151+
149152
@volatile private var rootDirChecked: Boolean = false
150153
@volatile private var fileMappings = RocksDBFileMappings(
151154
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
@@ -398,15 +401,63 @@ class RocksDBFileManager(
398401
}
399402
}
400403

404+
/**
405+
* Set maxSeenVersion to max of itself and version we are uploading.
406+
* This is to ensure accuracy in the case the query has restarted from a particular version.
407+
*/
408+
def setMaxSeenVersion(version: Long): Unit = {
409+
if (maxSeenVersion.isDefined) {
410+
maxSeenVersion = Some(Math.max(maxSeenVersion.get, version))
411+
} else {
412+
maxSeenVersion = Some(version)
413+
}
414+
}
415+
416+
/**
417+
* Determines whether batch deletion of stale version files should be skipped
418+
* based on the following parameters and estimates of maximum and minimum
419+
* versions present in the checkpoint directory.
420+
*
421+
* @param numVersionsToRetain Number of versions to retain for rollbacks.
422+
* @param minVersionsToDelete Minimum number of stale versions required to trigger deletion.
423+
* @return `true` if insufficient stale versions present, otherwise `false`.
424+
*/
425+
private def shouldSkipDeletion(numVersionsToRetain: Int, minVersionsToDelete: Long): Boolean = {
426+
// If minVersionsToDelete <= 0, we call list every time maintenance is invoked
427+
// This is the original behaviour without list api call optimization
428+
if (minVersionsToDelete > 0) {
429+
// When maxSeenVersion is defined, we check the if number of stale version files
430+
// are at least the value of minVersionsToDelete for batch deletion of files
431+
// We still proceed with deletion if maxSeenVersion isn't set to ensure the fallback
432+
// is to clean up files if maxSeenVersion fails to be initialized
433+
if (maxSeenVersion.isDefined) {
434+
logInfo(log"Estimated maximum version is " +
435+
log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
436+
log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION, minSeenVersion)}")
437+
val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 - numVersionsToRetain
438+
if (versionsToDelete < minVersionsToDelete) {
439+
logInfo(log"Skipping deleting files." +
440+
log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE, minVersionsToDelete)}" +
441+
log" stale versions for batch deletion but found only" +
442+
log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.")
443+
return true
444+
}
445+
}
446+
}
447+
false
448+
}
449+
401450
/**
402451
* Delete old versions by deleting the associated version and SST files.
403-
* At a high-level, this method finds which versions to delete, and which SST files that were
452+
* At a high-level, when enough stale version files are present for batch deletion,
453+
* this method finds which versions to delete, and which SST files that were
404454
* last used in those versions. It's safe to delete these SST files because a SST file can
405455
* be reused only in successive versions. Therefore, if a SST file F was last used in version
406456
* V, then it won't be used in version V+1 or later, and if version V can be deleted, then
407457
* F can safely be deleted as well.
408458
*
409-
* To find old files, it does the following.
459+
* First, it checks whether enough stale version files are present for batch deletion.
460+
* If true, it does the following to find old files.
410461
* - List all the existing [version].zip files
411462
* - Find the min version that needs to be retained based on the given `numVersionsToRetain`.
412463
* - Accordingly decide which versions should be deleted.
@@ -426,7 +477,10 @@ class RocksDBFileManager(
426477
* - SST files that were used in a version, but that version got overwritten with a different
427478
* set of SST files.
428479
*/
429-
def deleteOldVersions(numVersionsToRetain: Int): Unit = {
480+
def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = {
481+
// Check if enough stale version files present
482+
if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return
483+
430484
val path = new Path(dfsRootDir)
431485
val allFiles = fm.list(path).map(_.getPath)
432486
val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
@@ -526,6 +580,10 @@ class RocksDBFileManager(
526580
.map(_.getName.stripSuffix(".changelog")).map(_.toLong)
527581
.filter(_ < minVersionToRetain)
528582
deleteChangelogFiles(changelogVersionsToDelete)
583+
584+
// Always set minSeenVersion for regular deletion frequency even if deletion fails.
585+
// This is safe because subsequent calls retry deleting old version files
586+
minSeenVersion = minVersionToRetain
529587
}
530588

531589
/** Save immutable files to DFS directory */

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ class StateStoreConf(
4141
/** Minimum versions a State Store implementation should retain to allow rollbacks */
4242
val minVersionsToRetain: Int = sqlConf.minBatchesToRetain
4343

44+
/**
45+
* Minimum number of stale checkpoint versions that need to be present in the DFS
46+
* checkpoint directory for old state checkpoint version deletion to be invoked.
47+
* This is to amortize the cost of discovering and deleting old checkpoint versions.
48+
*/
49+
val minVersionsToDelete: Long =
50+
Math.round(sqlConf.ratioExtraSpaceAllowedInCheckpoint * sqlConf.minBatchesToRetain)
51+
4452
/** Maximum count of versions a State Store implementation should retain in memory */
4553
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory
4654

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,12 +230,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
230230
}
231231

232232
testWithColumnFamilies(
233-
"RocksDB: purge changelog and snapshots",
233+
"RocksDB: purge changelog and snapshots with minVersionsToDelete = 0",
234234
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
235235
val remoteDir = Utils.createTempDir().toString
236236
new File(remoteDir).delete() // to make sure that the directory gets created
237237
val conf = dbConf.copy(enableChangelogCheckpointing = true,
238-
minVersionsToRetain = 3, minDeltasForSnapshot = 1)
238+
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 0)
239239
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
240240
db.load(0)
241241
db.commit()
@@ -271,6 +271,51 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
271271
}
272272
}
273273

274+
testWithColumnFamilies(
275+
"RocksDB: purge version files with minVersionsToDelete > 0",
276+
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
277+
val remoteDir = Utils.createTempDir().toString
278+
new File(remoteDir).delete() // to make sure that the directory gets created
279+
val conf = dbConf.copy(
280+
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 3)
281+
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
282+
// Commit 5 versions
283+
// stale versions: (1, 2)
284+
// keep versions: (3, 4, 5)
285+
for (version <- 0 to 4) {
286+
// Should upload latest snapshot but not delete any files
287+
// since number of stale versions < minVersionsToDelete
288+
db.load(version)
289+
db.commit()
290+
db.doMaintenance()
291+
}
292+
293+
// Commit 1 more version
294+
// stale versions: (1, 2, 3)
295+
// keep versions: (4, 5, 6)
296+
db.load(5)
297+
db.commit()
298+
299+
// Checkpoint directory before maintenance
300+
if (isChangelogCheckpointingEnabled) {
301+
assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
302+
assert(changelogVersionsPresent(remoteDir) == (1 to 6))
303+
} else {
304+
assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
305+
}
306+
307+
// Should delete stale versions for zip files and change log files
308+
// since number of stale versions >= minVersionsToDelete
309+
db.doMaintenance()
310+
311+
// Checkpoint directory after maintenance
312+
assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
313+
if (isChangelogCheckpointingEnabled) {
314+
assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
315+
}
316+
}
317+
}
318+
274319
testWithColumnFamilies(
275320
"RocksDB: minDeltasForSnapshot",
276321
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>

0 commit comments

Comments
 (0)