Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

Expand Down Expand Up @@ -218,6 +218,38 @@ class RocksDBFileManager(
}
}

/**
* Find orphan files which are not tracked by zip files.
* Both sst files and log files can be orphan files.
* They are uploaded separately before the zip file of that version is uploaded.
* When the zip file of a version get overwritten, the referenced sst and log files become orphan.
* Be careful here since sst and log files of the ongoing version
* also appear to be orphan before their zip file is uploaded.
*
* @param trackedFiles files tracked by metadata in versioned zip file
* @param allFiles all sst or log files in the directory.
* @return filenames of orphan files
*/
def findOrphanFiles(trackedFiles: Seq[String], allFiles: Seq[FileStatus]): Seq[String] = {
val fileModificationTimes = allFiles.map(file =>
file.getPath.getName -> file.getModificationTime).toMap
if (trackedFiles.nonEmpty && allFiles.size > trackedFiles.size) {
// Some tracked files may not be in the directory when listing.
val oldestTrackedFileModificationTime = trackedFiles.flatMap(fileModificationTimes.get(_)).min
// If this immutable file is older than any tracked file,
// then it can't belong to the ongoing version and it should be safe to clean it up.
val orphanFiles = fileModificationTimes
.filter(_._2 < oldestTrackedFileModificationTime).keys.toSeq
if (orphanFiles.nonEmpty) {
logInfo(s"Found ${orphanFiles.size} orphan files: ${orphanFiles.take(20).mkString(", ")}" +
"... (display at most 20 filenames) that should be deleted.")
}
orphanFiles
} else {
Seq.empty
}
}

/**
* Delete old versions by deleting the associated version and SST files.
* At a high-level, this method finds which versions to delete, and which SST files that were
Expand All @@ -231,9 +263,12 @@ class RocksDBFileManager(
* - Find the min version that needs to be retained based on the given `numVersionsToRetain`.
* - Accordingly decide which versions should be deleted.
* - Resolve all SSTs files of all the existing versions, if not already resolved.
* - Find what was the latest version in which each SST file was used.
* - Delete the files that were last used in the to-be-deleted versions as we will not
* - Find the files that were last used in the to-be-deleted versions as we will not
* need those files any more.
* - Find the orphan sst and log files whose zip files are not uploaded successfully
* or have been overwritten. To avoid deleting files of ongoing tasks, only delete orphan files
* that are older than all tracked files when there are at least 2 versions.
* - Delete files in both to-be-deleted versions and orphan files.
*
* Note that it only deletes files that it knows are safe to delete.
* It may not delete the following files.
Expand All @@ -260,7 +295,9 @@ class RocksDBFileManager(
math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long]

// Return if no version to delete
// When versionToDelete is non-empty, there are at least 2 versions.
// We only delete orphan files when there are at least 2 versions,
// which avoid deleting files for running tasks.
if (versionsToDelete.isEmpty) return

logInfo(
Expand All @@ -269,29 +306,45 @@ class RocksDBFileManager(
s"$numVersionsToRetain versions")

// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedVersions.foreach { version =>
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f) = version)
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
math.max(version, fileToMaxUsedVersion.getOrElse(f.dfsFileName, version)))
}

// Best effort attempt to delete SST files that were last used in to-be-deleted versions
val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) }

val sstDir = new Path(dfsRootDir, RocksDBImmutableFile.SST_FILES_DFS_SUBDIR)
val logDir = new Path(dfsRootDir, RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR)
val allSstFiles = if (fm.exists(sstDir)) fm.list(sstDir).toSeq else Seq.empty
val allLogFiles = if (fm.exists(logDir)) fm.list(logDir).toSeq else Seq.empty
filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles)
.map(_ -> -1L)
logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain")
var failedToDelete = 0
filesToDelete.foreach { case (file, maxUsedVersion) =>
filesToDelete.foreach { case (dfsFileName, maxUsedVersion) =>
try {
val dfsFile = dfsFilePath(file.dfsFileName)
val dfsFile = dfsFilePath(dfsFileName)
fm.delete(dfsFile)
logDebug(s"Deleted file $file that was last used in version $maxUsedVersion")
if (maxUsedVersion == -1) {
logDebug(s"Deleted orphan file $dfsFileName")
} else {
logDebug(s"Deleted file $dfsFileName that was last used in version $maxUsedVersion")
}
} catch {
case e: Exception =>
failedToDelete += 1
logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e)
if (maxUsedVersion == -1) {
logWarning(s"Error deleting orphan file $dfsFileName", e)
} else {
logWarning(s"Error deleting file $dfsFileName, last used in version $maxUsedVersion", e)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,139 @@ class RocksDBSuite extends SparkFunSuite {
}
}

test("RocksDBFileManager: delete orphan files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
// Use 2 file managers here to emulate concurrent execution
// that checkpoint the same version of state
val fileManager = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
val fileManager_ = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
val sstDir = s"$dfsRootDir/SSTs"
def numRemoteSSTFiles: Int = listFiles(sstDir).length
val logDir = s"$dfsRootDir/logs"
def numRemoteLogFiles: Int = listFiles(logDir).length

// Save a version of checkpoint files
val cpFiles1 = Seq(
"001.sst" -> 10,
"002.sst" -> 20,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00001.log" -> 1000,
"archive/00002.log" -> 2000
)
saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
assert(fileManager.getLatestVersion() === 1)
assert(numRemoteSSTFiles == 2) // 2 sst files copied
assert(numRemoteLogFiles == 2)


// Overwrite version 1, previous sst and log files will become orphan
val cpFiles1_ = Seq(
"001.sst" -> 10,
"002.sst" -> 20,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00002.log" -> 1000,
"archive/00003.log" -> 2000
)
saveCheckpointFiles(fileManager_, cpFiles1_, version = 1, numKeys = 101)
assert(fileManager_.getLatestVersion() === 1)
assert(numRemoteSSTFiles == 4)
assert(numRemoteLogFiles == 4)

// For orphan files cleanup test, add a sleep between 2 checkpoints.
// We use file modification timestamp to find orphan files older than
// any tracked files. Some file systems has timestamps in second precision.
// Sleeping for 1.5s makes sure files from different versions has different timestamps.
Thread.sleep(1500)
// Save a version of checkpoint files
val cpFiles2 = Seq(
"003.sst" -> 10,
"004.sst" -> 20,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00004.log" -> 1000,
"archive/00005.log" -> 2000
)
saveCheckpointFiles(fileManager_, cpFiles2, version = 2, numKeys = 121)
fileManager_.deleteOldVersions(1)
assert(numRemoteSSTFiles <= 4) // delete files recorded in 1.zip
assert(numRemoteLogFiles <= 5) // delete files recorded in 1.zip and orphan 00001.log

Thread.sleep(1500)
// Save a version of checkpoint files
val cpFiles3 = Seq(
"005.sst" -> 10,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00006.log" -> 1000,
"archive/00007.log" -> 2000
)
saveCheckpointFiles(fileManager_, cpFiles3, version = 3, numKeys = 131)
assert(fileManager_.getLatestVersion() === 3)
fileManager_.deleteOldVersions(1)
assert(numRemoteSSTFiles == 1)
assert(numRemoteLogFiles == 2)
}
}

test("RocksDBFileManager: don't delete orphan files when there is only 1 version") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
val fileManager = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
(new File(dfsRootDir, "SSTs")).mkdir()
(new File(dfsRootDir, "logs")).mkdir()

val sstDir = s"$dfsRootDir/SSTs"
def numRemoteSSTFiles: Int = listFiles(sstDir).length

val logDir = s"$dfsRootDir/logs"
def numRemoteLogFiles: Int = listFiles(logDir).length

new File(sstDir, "orphan.sst").createNewFile()
new File(logDir, "orphan.log").createNewFile()

Thread.sleep(1500)
// Save a version of checkpoint files
val cpFiles1 = Seq(
"001.sst" -> 10,
"002.sst" -> 20,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00001.log" -> 1000,
"archive/00002.log" -> 2000
)
saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
fileManager.deleteOldVersions(1)
// Should not delete orphan files even when they are older than all existing files
// when there is only 1 version.
assert(numRemoteSSTFiles == 3)
assert(numRemoteLogFiles == 3)

Thread.sleep(1500)
// Save a version of checkpoint files
val cpFiles2 = Seq(
"003.sst" -> 10,
"004.sst" -> 20,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00003.log" -> 1000,
"archive/00004.log" -> 2000
)
saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 101)
assert(numRemoteSSTFiles == 5)
assert(numRemoteLogFiles == 5)
fileManager.deleteOldVersions(1)
// Orphan files should be deleted now.
assert(numRemoteSSTFiles == 2)
assert(numRemoteLogFiles == 2)
}
}

test("RocksDBFileManager: upload only new immutable files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
Expand Down