diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8ce8e148709..d019b3e01153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -104,7 +104,7 @@ class FileStreamSource( // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) - metadataLog.allFiles().foreach { entry => + metadataLog.restore().foreach { entry => seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 7b2ea9627a98..72c00317d986 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -36,6 +36,7 @@ class FileStreamSourceLog( extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { import CompactibleFileStreamLog._ + import FileStreamSourceLog._ // Configurations about metadata compaction protected override val defaultCompactInterval: Int = @@ -122,8 +123,34 @@ class FileStreamSourceLog( } batches } + + def restore(): Array[FileEntry] = { + val files = allFiles() + + // When restarting the query, there is a case which the query starts from compaction batch, + // and the batch has source metadata file to read. One case is that the previous query + // succeeded to read from inputs, but not finalized the batch for various reasons. + // The below code finds the latest compaction batch, and put entries for the batch into the + // file entry cache which would avoid reading compact batch file twice. + // It doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly + // batch to start from, but in practice, only couple of latest batches are candidates to + // be started. We leverage the fact to skip calculation if possible. + files.lastOption.foreach { lastEntry => + val latestBatchId = lastEntry.batchId + val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) + if ((latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) { + val logsForLatestCompactedBatch = files.filter { entry => + entry.batchId == latestCompactedBatchId + } + fileEntryCache.put(latestCompactedBatchId, logsForLatestCompactedBatch) + } + } + + files + } } object FileStreamSourceLog { val VERSION = 1 + val PREV_NUM_BATCHES_TO_READ_IN_RESTORE = 2 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143e..b89d1001cae3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1371,6 +1371,70 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("restore from file stream source log") { + def createEntries(batchId: Long, count: Int): Array[FileEntry] = { + (1 to count).map { idx => + FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId) + }.toArray + } + + withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") { + def verifyBatchAvailabilityInCache( + fileEntryCache: java.util.LinkedHashMap[Long, Array[FileEntry]], + expectNotAvailable: Seq[Int], + expectAvailable: Seq[Int]): Unit = { + expectNotAvailable.foreach { batchId => + assert(!fileEntryCache.containsKey(batchId.toLong)) + } + expectAvailable.foreach { batchId => + assert(fileEntryCache.containsKey(batchId.toLong)) + } + } + withTempDir { chk => + val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, Array[FileEntry]]]( + Symbol("fileEntryCache")) + + val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache = metadata invokePrivate _fileEntryCache() + + (0 to 4).foreach { batchId => + metadata.add(batchId, createEntries(batchId, 100)) + } + val allFiles = metadata.allFiles() + + // batch 4 is a compact batch which logs would be cached in fileEntryCache + verifyBatchAvailabilityInCache(fileEntryCache, Seq(0, 1, 2, 3), Seq(4)) + + val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache2 = metadata2 invokePrivate _fileEntryCache() + + // allFiles() doesn't restore the logs for the latest compact batch into file entry cache + assert(metadata2.allFiles() === allFiles) + verifyBatchAvailabilityInCache(fileEntryCache2, Seq(0, 1, 2, 3, 4), Seq.empty) + + // restore() will restore the logs for the latest compact batch into file entry cache + assert(metadata2.restore() === allFiles) + verifyBatchAvailabilityInCache(fileEntryCache2, Seq(0, 1, 2, 3), Seq(4)) + + (5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId => + metadata2.add(batchId, createEntries(batchId, 100)) + } + + val metadata3 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache3 = metadata3 invokePrivate _fileEntryCache() + + // restore() will not restore the logs for the latest compact batch into file entry cache + // if the latest batch is too far from latest compact batch, because it's unlikely Spark + // will request the batch for the start point. + assert(metadata3.restore() === metadata2.allFiles()) + verifyBatchAvailabilityInCache(fileEntryCache3, Seq(0, 1, 2, 3, 4), Seq.empty) + } + } + } + test("get arbitrary batch from FileStreamSource") { withTempDirs { case (src, tmp) => withSQLConf(