From c907d9441554dcde16bbe82d205b25e50bf83440 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Wed, 15 May 2024 07:11:21 -0400 Subject: [PATCH 1/6] Don't cache files for availableNow trigger --- .../spark/sql/execution/streaming/FileStreamSource.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 2f5bb2f010e9..1056eef58453 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -181,7 +181,8 @@ class FileStreamSource( // Obey user's setting to limit the number of files in this batch trigger. val (batchFiles, unselectedFiles) = limit match { - case files: ReadMaxFiles if !sourceOptions.latestFirst => + case files: ReadMaxFiles + if !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) { @@ -198,7 +199,8 @@ class FileStreamSource( // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (newFiles.take(files.maxFiles()), null) - case files: ReadMaxBytes if !sourceOptions.latestFirst => + case files: ReadMaxBytes + if !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null => // we can cache and reuse remaining fetched list of files in further batches val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = takeFilesUntilMax(newFiles, files.maxBytes()) From 90d8ec5e88530b7df6171ea7938a2ce399eaaece Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 16 May 2024 18:20:46 -0400 Subject: [PATCH 2/6] Add test --- .../streaming/FileStreamSource.scala | 8 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 40 +++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 1056eef58453..3e0fae9a867c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -129,7 +129,9 @@ class FileStreamSource( log"maxBytesPerBatch = ${MDC(LogKeys.NUM_BYTES, maxBytesPerBatch)}, " + log"maxFileAgeMs = ${MDC(LogKeys.TIME_UNITS, maxFileAgeMs)}") - private var unreadFiles: Seq[NewFileEntry] = _ + + // Visible for testing + private[sql] var unreadFiles: Seq[NewFileEntry] = _ /** * Split files into a selected/unselected pair according to a total size threshold. @@ -438,8 +440,8 @@ object FileStreamSource { def sparkPath: SparkPath = SparkPath.fromUrlString(path) } - /** Newly fetched files metadata holder. */ - private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long) + /** Newly fetched files metadata holder. Visible for testing. */ + private[sql] case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long) private case class FilesSplit(files: Seq[NewFileEntry], size: BigInt) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fd3d59af7e6b..214d32c82a6c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2357,6 +2357,46 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-48314: Don't cache unread files when using Trigger.AvailableNow") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + + source.prepareForTriggerAvailableNow() + + CountListingLocalFileSystem.resetCount() + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(idx * 10000) + f + } + + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + + // All files are already tracked in allFilesForTriggerAvailableNow + assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + + // Unread files should be empty + assert(source.unreadFiles == null) + + // Reading again leverages the files already tracked in allFilesForTriggerAvailableNow, + // so no more listings need to happen + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + + assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") { def formatTime(time: LocalDateTime): String = { time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")) From f4c425b96daa6b7b7db4e032dd597c26fd891276 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Thu, 16 May 2024 19:52:34 -0400 Subject: [PATCH 3/6] Cleanup --- .../sql/execution/streaming/FileStreamSource.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 3e0fae9a867c..715bc3c1bb81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -181,10 +181,11 @@ class FileStreamSource( } } + val shouldCache = !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null + // Obey user's setting to limit the number of files in this batch trigger. val (batchFiles, unselectedFiles) = limit match { - case files: ReadMaxFiles - if !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null => + case files: ReadMaxFiles if shouldCache => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) { @@ -198,11 +199,10 @@ class FileStreamSource( } case files: ReadMaxFiles => - // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch + // don't use the cache, just take files for the next batch (newFiles.take(files.maxFiles()), null) - case files: ReadMaxBytes - if !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null => + case files: ReadMaxBytes if shouldCache => // we can cache and reuse remaining fetched list of files in further batches val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = takeFilesUntilMax(newFiles, files.maxBytes()) @@ -217,8 +217,8 @@ class FileStreamSource( } case files: ReadMaxBytes => + // don't use the cache, just take files for the next batch val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, files.maxBytes()) - // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (bFiles, null) case _: ReadAllAvailable => (newFiles, null) From 8879afe31837a8fe656855a95d0c020daa09b628 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 21 May 2024 11:11:46 -0400 Subject: [PATCH 4/6] Check files in batch and use new setting to test skipping cache --- .../streaming/FileStreamSource.scala | 5 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 25 +++++++++++-------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 7ff86dbcbac7..2e580bf2cbb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -135,8 +135,7 @@ class FileStreamSource( log"maxFileAgeMs = ${MDC(LogKeys.TIME_UNITS, maxFileAgeMs)}") - // Visible for testing - private[sql] var unreadFiles: Seq[NewFileEntry] = _ + private var unreadFiles: Seq[NewFileEntry] = _ /** * Split files into a selected/unselected pair according to a total size threshold. @@ -443,7 +442,7 @@ object FileStreamSource { } /** Newly fetched files metadata holder. Visible for testing. */ - private[sql] case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long) + private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long) private case class FilesSplit(files: Seq[NewFileEntry], size: BigInt) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 7483d9f657ee..ca4f2a7f26ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2451,14 +2451,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("SPARK-48314: Don't cache unread files when using Trigger.AvailableNow") { withCountListingLocalFileSystemAsLocalFileSystem { withThreeTempDirs { case (src, meta, tmp) => - val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5") + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5", + "maxCachedFiles" -> "2") val scheme = CountListingLocalFileSystem.scheme val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", StructType(Nil), Seq.empty, meta.getCanonicalPath, options) - - source.prepareForTriggerAvailableNow() - - CountListingLocalFileSystem.resetCount() + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() // provide 20 files in src, with sequential "last modified" to guarantee ordering (0 to 19).map { idx => @@ -2467,23 +2466,29 @@ class FileStreamSourceSuite extends FileStreamSourceTest { f } - source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + source.prepareForTriggerAvailableNow() + CountListingLocalFileSystem.resetCount() + + var offset = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) .asInstanceOf[FileStreamSourceOffset] + var files = metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry]) // All files are already tracked in allFilesForTriggerAvailableNow assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) - - // Unread files should be empty - assert(source.unreadFiles == null) + // Should be 5 files in the batch based on maxFiles limit + assert(files.length == 5) // Reading again leverages the files already tracked in allFilesForTriggerAvailableNow, // so no more listings need to happen - source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + offset = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) .asInstanceOf[FileStreamSourceOffset] + files = metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry]) assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + // Should be 5 files in the batch since cached files are ignored + assert(files.length == 5) } } } From a7e019802e18c1819ad5f0b3056c96feccc32448 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 21 May 2024 11:13:14 -0400 Subject: [PATCH 5/6] Remove extra newline --- .../apache/spark/sql/execution/streaming/FileStreamSource.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 2e580bf2cbb1..05e7dcfe8ac3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -134,7 +134,6 @@ class FileStreamSource( log"maxBytesPerBatch = ${MDC(LogKeys.NUM_BYTES, maxBytesPerBatch)}, " + log"maxFileAgeMs = ${MDC(LogKeys.TIME_UNITS, maxFileAgeMs)}") - private var unreadFiles: Seq[NewFileEntry] = _ /** From 4c33fef9fe88be8cc8f103d50fb849d6671ac182 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Tue, 21 May 2024 11:14:05 -0400 Subject: [PATCH 6/6] Remove comment to go along with undo-ing visibility change --- .../apache/spark/sql/execution/streaming/FileStreamSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 05e7dcfe8ac3..4a9b2d11b7e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -440,7 +440,7 @@ object FileStreamSource { def sparkPath: SparkPath = SparkPath.fromUrlString(path) } - /** Newly fetched files metadata holder. Visible for testing. */ + /** Newly fetched files metadata holder. */ private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long) private case class FilesSplit(files: Seq[NewFileEntry], size: BigInt)