From a5c41205a2262e14d0b8a284033b56debd24a90e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 20 Feb 2020 23:09:22 +0900 Subject: [PATCH 1/4] [SPARK-30900][SS] FileStreamSource: Avoid reading compact metadata log twice if the query restarts from compact batch --- .../streaming/FileStreamSource.scala | 2 +- .../streaming/FileStreamSourceLog.scala | 28 ++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 54 +++++++++++++++++++ 3 files changed, 83 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8ce8e148709..d019b3e01153 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -104,7 +104,7 @@ class FileStreamSource( // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) - metadataLog.allFiles().foreach { entry => + metadataLog.restore().foreach { entry => seenFiles.add(entry.path, entry.timestamp) } seenFiles.purge() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 7b2ea9627a98..8dd794e9df50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -36,6 +36,7 @@ class FileStreamSourceLog( extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { import CompactibleFileStreamLog._ + import FileStreamSourceLog._ // Configurations about metadata compaction protected override val defaultCompactInterval: Int = @@ -122,8 +123,35 @@ class FileStreamSourceLog( } batches } + + def restore(): Array[FileEntry] = { + val files = allFiles() + + // When restarting the query, there is a case which the query starts from compaction batch, + // and the batch has source metadata file to read. One case is that the previous query + // succeeded to read from inputs, but not finalized the batch for various reasons. + // The below code finds the latest compaction batch, and put entries for the batch into the + // file entry cache which would avoid reading compact batch file twice. + // It doesn't know about offset / commit metadata in checkpoint so doesn't know which exactly + // batch to start from, but in practice, only couple of latest batches are candidates to + // be started. We leverage the fact to skip calculation if possible. + files.lastOption.foreach { lastEntry => + val latestBatchId = lastEntry.batchId + val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) + if (latestCompactedBatchId > 0 && + (latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) { + val logsForLatestCompactedBatch = files.filter { entry => + entry.batchId == latestCompactedBatchId + } + fileEntryCache.put(latestCompactedBatchId, logsForLatestCompactedBatch) + } + } + + files + } } object FileStreamSourceLog { val VERSION = 1 + val PREV_NUM_BATCHES_TO_READ_IN_RESTORE = 2 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143e..80129a26888d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1371,6 +1371,60 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("restore from file stream source log") { + def createEntries(batchId: Long, count: Int): Array[FileEntry] = { + (1 to count).map { idx => + FileEntry(s"path_${batchId}_$idx", 10000 * batchId + count, batchId) + }.toArray + } + + withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") { + withTempDir { chk => + val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, Array[FileEntry]]]( + Symbol("fileEntryCache")) + + val metadata = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache = metadata invokePrivate _fileEntryCache() + + (0 to 4).foreach { batchId => + metadata.add(batchId, createEntries(batchId, 100)) + } + val allFiles = metadata.allFiles() + + // batch 4 is a compact batch which logs would be cached in fileEntryCache + fileEntryCache.containsKey(4) + + val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache2 = metadata2 invokePrivate _fileEntryCache() + + // allFiles() doesn't restore the logs for the latest compact batch into file entry cache + assert(metadata2.allFiles() === allFiles) + assert(!fileEntryCache2.containsKey(4L)) + + // restore() will restore the logs for the latest compact batch into file entry cache + assert(metadata2.restore() === allFiles) + assert(fileEntryCache2.containsKey(4L)) + + (5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId => + metadata2.add(batchId, createEntries(batchId, 100)) + } + val allFiles2 = metadata2.allFiles() + + val metadata3 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, + chk.getCanonicalPath) + val fileEntryCache3 = metadata3 invokePrivate _fileEntryCache() + + // restore() will not restore the logs for the latest compact batch into file entry cache + // if the latest batch is too far from latest compact batch, because it's unlikely Spark + // will request the batch for the start point. + assert(metadata2.restore() === allFiles2) + assert(!fileEntryCache3.containsKey(4L)) + } + } + } + test("get arbitrary batch from FileStreamSource") { withTempDirs { case (src, tmp) => withSQLConf( From 4e3046c2e3f94abf848f3cca53894e9ec3b61b4e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 26 Mar 2020 16:21:33 +0900 Subject: [PATCH 2/4] Fix silly mistake --- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 80129a26888d..c3fcb37fb0a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1393,7 +1393,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val allFiles = metadata.allFiles() // batch 4 is a compact batch which logs would be cached in fileEntryCache - fileEntryCache.containsKey(4) + assert(fileEntryCache.containsKey(4L)) val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, chk.getCanonicalPath) From 175ba90669662e620282b68932f53421e95e3874 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 27 Mar 2020 16:11:53 +0900 Subject: [PATCH 3/4] Reflect review comments --- .../streaming/FileStreamSourceLog.scala | 3 +-- .../sql/streaming/FileStreamSourceSuite.scala | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 8dd794e9df50..72c00317d986 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -138,8 +138,7 @@ class FileStreamSourceLog( files.lastOption.foreach { lastEntry => val latestBatchId = lastEntry.batchId val latestCompactedBatchId = getAllValidBatches(latestBatchId, compactInterval)(0) - if (latestCompactedBatchId > 0 && - (latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) { + if ((latestBatchId - latestCompactedBatchId) < PREV_NUM_BATCHES_TO_READ_IN_RESTORE) { val logsForLatestCompactedBatch = files.filter { entry => entry.batchId == latestCompactedBatchId } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index c3fcb37fb0a4..32757105b22b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1379,6 +1379,17 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } withSQLConf(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "5") { + def verifyBatchAvailabilityInCache( + fileEntryCache: java.util.LinkedHashMap[Long, Array[FileEntry]], + expectNotAvailable: Seq[Int], + expectAvailable: Seq[Int]): Unit = { + expectNotAvailable.foreach { batchId => + assert(!fileEntryCache.containsKey(batchId.toLong)) + } + expectAvailable.foreach { batchId => + assert(fileEntryCache.containsKey(batchId.toLong)) + } + } withTempDir { chk => val _fileEntryCache = PrivateMethod[java.util.LinkedHashMap[Long, Array[FileEntry]]]( Symbol("fileEntryCache")) @@ -1393,7 +1404,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val allFiles = metadata.allFiles() // batch 4 is a compact batch which logs would be cached in fileEntryCache - assert(fileEntryCache.containsKey(4L)) + verifyBatchAvailabilityInCache(fileEntryCache, Seq(0, 1, 2, 3), Seq(4)) val metadata2 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, chk.getCanonicalPath) @@ -1401,11 +1412,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // allFiles() doesn't restore the logs for the latest compact batch into file entry cache assert(metadata2.allFiles() === allFiles) - assert(!fileEntryCache2.containsKey(4L)) + verifyBatchAvailabilityInCache(fileEntryCache2, Seq(0, 1, 2, 3, 4), Seq.empty) // restore() will restore the logs for the latest compact batch into file entry cache assert(metadata2.restore() === allFiles) - assert(fileEntryCache2.containsKey(4L)) + verifyBatchAvailabilityInCache(fileEntryCache2, Seq(0, 1, 2, 3), Seq(4)) (5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId => metadata2.add(batchId, createEntries(batchId, 100)) @@ -1420,7 +1431,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // if the latest batch is too far from latest compact batch, because it's unlikely Spark // will request the batch for the start point. assert(metadata2.restore() === allFiles2) - assert(!fileEntryCache3.containsKey(4L)) + verifyBatchAvailabilityInCache(fileEntryCache3, Seq(0, 1, 2, 3, 4), Seq.empty) } } } From 6406e36eb34377983aaf113495ca16b1553317a3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 27 Mar 2020 18:17:42 +0900 Subject: [PATCH 4/4] another review comment --- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 32757105b22b..b89d1001cae3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1421,7 +1421,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { (5 to 5 + FileStreamSourceLog.PREV_NUM_BATCHES_TO_READ_IN_RESTORE).foreach { batchId => metadata2.add(batchId, createEntries(batchId, 100)) } - val allFiles2 = metadata2.allFiles() val metadata3 = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, chk.getCanonicalPath) @@ -1430,7 +1429,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // restore() will not restore the logs for the latest compact batch into file entry cache // if the latest batch is too far from latest compact batch, because it's unlikely Spark // will request the batch for the start point. - assert(metadata2.restore() === allFiles2) + assert(metadata3.restore() === metadata2.allFiles()) verifyBatchAvailabilityInCache(fileEntryCache3, Seq(0, 1, 2, 3, 4), Seq.empty) } }