Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ private[spark] object LogKeys {
case object MAX_NUM_PARTITIONS extends LogKey
case object MAX_NUM_POSSIBLE_BINS extends LogKey
case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey
case object MAX_SEEN_VERSION extends LogKey
case object MAX_SERVICE_NAME_LENGTH extends LogKey
case object MAX_SIZE extends LogKey
case object MAX_SLOTS extends LogKey
Expand All @@ -420,9 +421,11 @@ private[spark] object LogKeys {
case object MIN_NUM_FREQUENT_PATTERN extends LogKey
case object MIN_POINT_PER_CLUSTER extends LogKey
case object MIN_RATE extends LogKey
case object MIN_SEEN_VERSION extends LogKey
case object MIN_SHARE extends LogKey
case object MIN_SIZE extends LogKey
case object MIN_TIME extends LogKey
case object MIN_VERSIONS_TO_DELETE extends LogKey
case object MIN_VERSION_NUM extends LogKey
case object MISSING_PARENT_STAGES extends LogKey
case object MODEL_WEIGHTS extends LogKey
Expand Down Expand Up @@ -849,6 +852,7 @@ private[spark] object LogKeys {
case object USER_NAME extends LogKey
case object UUID extends LogKey
case object VALUE extends LogKey
case object VERSIONS_TO_DELETE extends LogKey
case object VERSION_NUM extends LogKey
case object VIEW_ACLS extends LogKey
case object VIEW_ACLS_GROUPS extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2129,6 +2129,17 @@ object SQLConf {
.intConf
.createWithDefault(100)

val RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT =
buildConf("spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint")
.internal()
.doc("The ratio of extra space allowed for batch deletion of files when maintenance is" +
"invoked. When value > 0, it optimizes the cost of discovering and deleting old checkpoint " +
"versions. The minimum number of stale versions we retain in checkpoint location for batch " +
"deletion is calculated by minBatchesToRetain * ratioExtraSpaceAllowedInCheckpoint.")
.version("4.0.0")
.doubleConf
.createWithDefault(0.3)

val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory")
.internal()
.doc("The maximum number of batches which will be retained in memory to avoid " +
Expand Down Expand Up @@ -5406,6 +5417,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

def ratioExtraSpaceAllowedInCheckpoint: Double = getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT)

def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)

def streamingMaintenanceInterval: Long = getConf(STREAMING_MAINTENANCE_INTERVAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class RocksDB(
val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version)
val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion
// Initialize maxVersion upon successful load from DFS
fileManager.setMaxSeenVersion(version)

// reset last snapshot version
if (lastSnapshotVersion > latestSnapshotVersion) {
Expand Down Expand Up @@ -554,6 +556,11 @@ class RocksDB(
}
}

// Set maxVersion when checkpoint files are synced to DFS successfully
// We need to handle this explicitly in RocksDB as we could use different
// changeLogWriter instances in fileManager instance when committing
fileManager.setMaxSeenVersion(newVersion)

numKeysOnLoadedVersion = numKeysOnWritingVersion
loadedVersion = newVersion
commitLatencyMs ++= Map(
Expand Down Expand Up @@ -640,7 +647,7 @@ class RocksDB(
uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
fileManager.deleteOldVersions(conf.minVersionsToRetain, conf.minVersionsToDelete)
}
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms")
}
Expand Down Expand Up @@ -896,6 +903,7 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null)
*/
case class RocksDBConf(
minVersionsToRetain: Int,
minVersionsToDelete: Long,
minDeltasForSnapshot: Int,
compactOnCommit: Boolean,
enableChangelogCheckpointing: Boolean,
Expand Down Expand Up @@ -1078,6 +1086,7 @@ object RocksDBConf {

RocksDBConf(
storeConf.minVersionsToRetain,
storeConf.minVersionsToDelete,
storeConf.minDeltasForSnapshot,
getBooleanConf(COMPACT_ON_COMMIT_CONF),
getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class RocksDBFileManager(

private def codec = CompressionCodec.createCodec(sparkConf, codecName)

private var maxSeenVersion: Option[Long] = None
private var minSeenVersion = 1L

@volatile private var rootDirChecked: Boolean = false
@volatile private var fileMappings = RocksDBFileMappings(
new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
Expand Down Expand Up @@ -398,15 +401,63 @@ class RocksDBFileManager(
}
}

/**
* Set maxSeenVersion to max of itself and version we are uploading.
* This is to ensure accuracy in the case the query has restarted from a particular version.
*/
def setMaxSeenVersion(version: Long): Unit = {
if (maxSeenVersion.isDefined) {
maxSeenVersion = Some(Math.max(maxSeenVersion.get, version))
} else {
maxSeenVersion = Some(version)
}
}

/**
* Determines whether batch deletion of stale version files should be skipped
* based on the following parameters and estimates of maximum and minimum
* versions present in the checkpoint directory.
*
* @param numVersionsToRetain Number of versions to retain for rollbacks.
* @param minVersionsToDelete Minimum number of stale versions required to trigger deletion.
* @return `true` if insufficient stale versions present, otherwise `false`.
*/
private def shouldSkipDeletion(numVersionsToRetain: Int, minVersionsToDelete: Long): Boolean = {
// If minVersionsToDelete <= 0, we call list every time maintenance is invoked
// This is the original behaviour without list api call optimization
if (minVersionsToDelete > 0) {
// When maxSeenVersion is defined, we check the if number of stale version files
// are at least the value of minVersionsToDelete for batch deletion of files
// We still proceed with deletion if maxSeenVersion isn't set to ensure the fallback
// is to clean up files if maxSeenVersion fails to be initialized
if (maxSeenVersion.isDefined) {
logInfo(log"Estimated maximum version is " +
log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION, minSeenVersion)}")
val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 - numVersionsToRetain
if (versionsToDelete < minVersionsToDelete) {
logInfo(log"Skipping deleting files." +
log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE, minVersionsToDelete)}" +
log" stale versions for batch deletion but found only" +
log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.")
return true
}
}
}
false
}

/**
* Delete old versions by deleting the associated version and SST files.
* At a high-level, this method finds which versions to delete, and which SST files that were
* At a high-level, when enough stale version files are present for batch deletion,
* this method finds which versions to delete, and which SST files that were
* last used in those versions. It's safe to delete these SST files because a SST file can
* be reused only in successive versions. Therefore, if a SST file F was last used in version
* V, then it won't be used in version V+1 or later, and if version V can be deleted, then
* F can safely be deleted as well.
*
* To find old files, it does the following.
* First, it checks whether enough stale version files are present for batch deletion.
* If true, it does the following to find old files.
* - List all the existing [version].zip files
* - Find the min version that needs to be retained based on the given `numVersionsToRetain`.
* - Accordingly decide which versions should be deleted.
Expand All @@ -426,7 +477,10 @@ class RocksDBFileManager(
* - SST files that were used in a version, but that version got overwritten with a different
* set of SST files.
*/
def deleteOldVersions(numVersionsToRetain: Int): Unit = {
def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = {
// Check if enough stale version files present
if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return

val path = new Path(dfsRootDir)
val allFiles = fm.list(path).map(_.getPath)
val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
Expand Down Expand Up @@ -526,6 +580,10 @@ class RocksDBFileManager(
.map(_.getName.stripSuffix(".changelog")).map(_.toLong)
.filter(_ < minVersionToRetain)
deleteChangelogFiles(changelogVersionsToDelete)

// Always set minSeenVersion for regular deletion frequency even if deletion fails.
// This is safe because subsequent calls retry deleting old version files
minSeenVersion = minVersionToRetain
}

/** Save immutable files to DFS directory */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class StateStoreConf(
/** Minimum versions a State Store implementation should retain to allow rollbacks */
val minVersionsToRetain: Int = sqlConf.minBatchesToRetain

/**
* Minimum number of stale checkpoint versions that need to be present in the DFS
* checkpoint directory for old state checkpoint version deletion to be invoked.
* This is to amortize the cost of discovering and deleting old checkpoint versions.
*/
val minVersionsToDelete: Long =
Math.round(sqlConf.ratioExtraSpaceAllowedInCheckpoint * sqlConf.minBatchesToRetain)

/** Maximum count of versions a State Store implementation should retain in memory */
val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}

testWithColumnFamilies(
"RocksDB: purge changelog and snapshots",
"RocksDB: purge changelog and snapshots with minVersionsToDelete = 0",
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val conf = dbConf.copy(enableChangelogCheckpointing = true,
minVersionsToRetain = 3, minDeltasForSnapshot = 1)
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 0)
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
db.load(0)
db.commit()
Expand Down Expand Up @@ -271,6 +271,51 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

testWithColumnFamilies(
"RocksDB: purge version files with minVersionsToDelete > 0",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
val conf = dbConf.copy(
minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 3)
withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db =>
// Commit 5 versions
// stale versions: (1, 2)
// keep versions: (3, 4, 5)
for (version <- 0 to 4) {
// Should upload latest snapshot but not delete any files
// since number of stale versions < minVersionsToDelete
db.load(version)
db.commit()
db.doMaintenance()
}

// Commit 1 more version
// stale versions: (1, 2, 3)
// keep versions: (4, 5, 6)
db.load(5)
db.commit()

// Checkpoint directory before maintenance
if (isChangelogCheckpointingEnabled) {
assert(snapshotVersionsPresent(remoteDir) == (1 to 5))
assert(changelogVersionsPresent(remoteDir) == (1 to 6))
} else {
assert(snapshotVersionsPresent(remoteDir) == (1 to 6))
}

// Should delete stale versions for zip files and change log files
// since number of stale versions >= minVersionsToDelete
db.doMaintenance()

// Checkpoint directory after maintenance
assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6))
if (isChangelogCheckpointingEnabled) {
assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6))
}
}
}

testWithColumnFamilies(
"RocksDB: minDeltasForSnapshot",
TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>
Expand Down