diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 51ef112a677d..5ed1a6f9cb5f 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -396,6 +396,7 @@ private[spark] object LogKeys { case object MAX_NUM_PARTITIONS extends LogKey case object MAX_NUM_POSSIBLE_BINS extends LogKey case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey + case object MAX_SEEN_VERSION extends LogKey case object MAX_SERVICE_NAME_LENGTH extends LogKey case object MAX_SIZE extends LogKey case object MAX_SLOTS extends LogKey @@ -420,9 +421,11 @@ private[spark] object LogKeys { case object MIN_NUM_FREQUENT_PATTERN extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey case object MIN_RATE extends LogKey + case object MIN_SEEN_VERSION extends LogKey case object MIN_SHARE extends LogKey case object MIN_SIZE extends LogKey case object MIN_TIME extends LogKey + case object MIN_VERSIONS_TO_DELETE extends LogKey case object MIN_VERSION_NUM extends LogKey case object MISSING_PARENT_STAGES extends LogKey case object MODEL_WEIGHTS extends LogKey @@ -849,6 +852,7 @@ private[spark] object LogKeys { case object USER_NAME extends LogKey case object UUID extends LogKey case object VALUE extends LogKey + case object VERSIONS_TO_DELETE extends LogKey case object VERSION_NUM extends LogKey case object VIEW_ACLS extends LogKey case object VIEW_ACLS_GROUPS extends LogKey diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f50eb9b12158..8603dc240053 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2129,6 +2129,17 @@ object SQLConf { .intConf .createWithDefault(100) + val RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT = + buildConf("spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint") + .internal() + .doc("The ratio of extra space allowed for batch deletion of files when maintenance is" + + "invoked. When value > 0, it optimizes the cost of discovering and deleting old checkpoint " + + "versions. The minimum number of stale versions we retain in checkpoint location for batch " + + "deletion is calculated by minBatchesToRetain * ratioExtraSpaceAllowedInCheckpoint.") + .version("4.0.0") + .doubleConf + .createWithDefault(0.3) + val MAX_BATCHES_TO_RETAIN_IN_MEMORY = buildConf("spark.sql.streaming.maxBatchesToRetainInMemory") .internal() .doc("The maximum number of batches which will be retained in memory to avoid " + @@ -5406,6 +5417,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN) + def ratioExtraSpaceAllowedInCheckpoint: Double = getConf(RATIO_EXTRA_SPACE_ALLOWED_IN_CHECKPOINT) + def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY) def streamingMaintenanceInterval: Long = getConf(STREAMING_MAINTENANCE_INTERVAL) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 2f3f5a57261f..b454e0ba5c93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -185,6 +185,8 @@ class RocksDB( val latestSnapshotVersion = fileManager.getLatestSnapshotVersion(version) val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir) loadedVersion = latestSnapshotVersion + // Initialize maxVersion upon successful load from DFS + fileManager.setMaxSeenVersion(version) // reset last snapshot version if (lastSnapshotVersion > latestSnapshotVersion) { @@ -554,6 +556,11 @@ class RocksDB( } } + // Set maxVersion when checkpoint files are synced to DFS successfully + // We need to handle this explicitly in RocksDB as we could use different + // changeLogWriter instances in fileManager instance when committing + fileManager.setMaxSeenVersion(newVersion) + numKeysOnLoadedVersion = numKeysOnWritingVersion loadedVersion = newVersion commitLatencyMs ++= Map( @@ -640,7 +647,7 @@ class RocksDB( uploadSnapshot() } val cleanupTime = timeTakenMs { - fileManager.deleteOldVersions(conf.minVersionsToRetain) + fileManager.deleteOldVersions(conf.minVersionsToRetain, conf.minVersionsToDelete) } logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms") } @@ -896,6 +903,7 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: Array[Byte] = null) */ case class RocksDBConf( minVersionsToRetain: Int, + minVersionsToDelete: Long, minDeltasForSnapshot: Int, compactOnCommit: Boolean, enableChangelogCheckpointing: Boolean, @@ -1078,6 +1086,7 @@ object RocksDBConf { RocksDBConf( storeConf.minVersionsToRetain, + storeConf.minVersionsToDelete, storeConf.minDeltasForSnapshot, getBooleanConf(COMPACT_ON_COMMIT_CONF), getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 6c8db12635fd..7e570bcfea7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -146,6 +146,9 @@ class RocksDBFileManager( private def codec = CompressionCodec.createCodec(sparkConf, codecName) + private var maxSeenVersion: Option[Long] = None + private var minSeenVersion = 1L + @volatile private var rootDirChecked: Boolean = false @volatile private var fileMappings = RocksDBFileMappings( new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]], @@ -398,15 +401,63 @@ class RocksDBFileManager( } } + /** + * Set maxSeenVersion to max of itself and version we are uploading. + * This is to ensure accuracy in the case the query has restarted from a particular version. + */ + def setMaxSeenVersion(version: Long): Unit = { + if (maxSeenVersion.isDefined) { + maxSeenVersion = Some(Math.max(maxSeenVersion.get, version)) + } else { + maxSeenVersion = Some(version) + } + } + + /** + * Determines whether batch deletion of stale version files should be skipped + * based on the following parameters and estimates of maximum and minimum + * versions present in the checkpoint directory. + * + * @param numVersionsToRetain Number of versions to retain for rollbacks. + * @param minVersionsToDelete Minimum number of stale versions required to trigger deletion. + * @return `true` if insufficient stale versions present, otherwise `false`. + */ + private def shouldSkipDeletion(numVersionsToRetain: Int, minVersionsToDelete: Long): Boolean = { + // If minVersionsToDelete <= 0, we call list every time maintenance is invoked + // This is the original behaviour without list api call optimization + if (minVersionsToDelete > 0) { + // When maxSeenVersion is defined, we check the if number of stale version files + // are at least the value of minVersionsToDelete for batch deletion of files + // We still proceed with deletion if maxSeenVersion isn't set to ensure the fallback + // is to clean up files if maxSeenVersion fails to be initialized + if (maxSeenVersion.isDefined) { + logInfo(log"Estimated maximum version is " + + log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" + + log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION, minSeenVersion)}") + val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 - numVersionsToRetain + if (versionsToDelete < minVersionsToDelete) { + logInfo(log"Skipping deleting files." + + log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE, minVersionsToDelete)}" + + log" stale versions for batch deletion but found only" + + log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.") + return true + } + } + } + false + } + /** * Delete old versions by deleting the associated version and SST files. - * At a high-level, this method finds which versions to delete, and which SST files that were + * At a high-level, when enough stale version files are present for batch deletion, + * this method finds which versions to delete, and which SST files that were * last used in those versions. It's safe to delete these SST files because a SST file can * be reused only in successive versions. Therefore, if a SST file F was last used in version * V, then it won't be used in version V+1 or later, and if version V can be deleted, then * F can safely be deleted as well. * - * To find old files, it does the following. + * First, it checks whether enough stale version files are present for batch deletion. + * If true, it does the following to find old files. * - List all the existing [version].zip files * - Find the min version that needs to be retained based on the given `numVersionsToRetain`. * - Accordingly decide which versions should be deleted. @@ -426,7 +477,10 @@ class RocksDBFileManager( * - SST files that were used in a version, but that version got overwritten with a different * set of SST files. */ - def deleteOldVersions(numVersionsToRetain: Int): Unit = { + def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = { + // Check if enough stale version files present + if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return + val path = new Path(dfsRootDir) val allFiles = fm.list(path).map(_.getPath) val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file)) @@ -526,6 +580,10 @@ class RocksDBFileManager( .map(_.getName.stripSuffix(".changelog")).map(_.toLong) .filter(_ < minVersionToRetain) deleteChangelogFiles(changelogVersionsToDelete) + + // Always set minSeenVersion for regular deletion frequency even if deletion fails. + // This is safe because subsequent calls retry deleting old version files + minSeenVersion = minVersionToRetain } /** Save immutable files to DFS directory */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index c7004524097a..e199e1a4765e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -41,6 +41,14 @@ class StateStoreConf( /** Minimum versions a State Store implementation should retain to allow rollbacks */ val minVersionsToRetain: Int = sqlConf.minBatchesToRetain + /** + * Minimum number of stale checkpoint versions that need to be present in the DFS + * checkpoint directory for old state checkpoint version deletion to be invoked. + * This is to amortize the cost of discovering and deleting old checkpoint versions. + */ + val minVersionsToDelete: Long = + Math.round(sqlConf.ratioExtraSpaceAllowedInCheckpoint * sqlConf.minBatchesToRetain) + /** Maximum count of versions a State Store implementation should retain in memory */ val maxVersionsToRetainInMemory: Int = sqlConf.maxBatchesToRetainInMemory diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 90331b8a098f..f39f321dd8ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -230,12 +230,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } testWithColumnFamilies( - "RocksDB: purge changelog and snapshots", + "RocksDB: purge changelog and snapshots with minVersionsToDelete = 0", TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled => val remoteDir = Utils.createTempDir().toString new File(remoteDir).delete() // to make sure that the directory gets created val conf = dbConf.copy(enableChangelogCheckpointing = true, - minVersionsToRetain = 3, minDeltasForSnapshot = 1) + minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 0) withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => db.load(0) db.commit() @@ -271,6 +271,51 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithColumnFamilies( + "RocksDB: purge version files with minVersionsToDelete > 0", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + val conf = dbConf.copy( + minVersionsToRetain = 3, minDeltasForSnapshot = 1, minVersionsToDelete = 3) + withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { db => + // Commit 5 versions + // stale versions: (1, 2) + // keep versions: (3, 4, 5) + for (version <- 0 to 4) { + // Should upload latest snapshot but not delete any files + // since number of stale versions < minVersionsToDelete + db.load(version) + db.commit() + db.doMaintenance() + } + + // Commit 1 more version + // stale versions: (1, 2, 3) + // keep versions: (4, 5, 6) + db.load(5) + db.commit() + + // Checkpoint directory before maintenance + if (isChangelogCheckpointingEnabled) { + assert(snapshotVersionsPresent(remoteDir) == (1 to 5)) + assert(changelogVersionsPresent(remoteDir) == (1 to 6)) + } else { + assert(snapshotVersionsPresent(remoteDir) == (1 to 6)) + } + + // Should delete stale versions for zip files and change log files + // since number of stale versions >= minVersionsToDelete + db.doMaintenance() + + // Checkpoint directory after maintenance + assert(snapshotVersionsPresent(remoteDir) == Seq(4, 5, 6)) + if (isChangelogCheckpointingEnabled) { + assert(changelogVersionsPresent(remoteDir) == Seq(4, 5, 6)) + } + } + } + testWithColumnFamilies( "RocksDB: minDeltasForSnapshot", TestWithChangelogCheckpointingEnabled) { colFamiliesEnabled =>