From b417911356356d35abbad768bf583b55a36d25cf Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 18 Feb 2020 13:56:38 +0900 Subject: [PATCH 1/5] [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files --- .../streaming/FileStreamSource.scala | 35 +++- .../sql/streaming/FileStreamSourceSuite.scala | 149 ++++++++++++++++-- 2 files changed, 168 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index e8ce8e148709..1c095b19b87f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -111,6 +111,8 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + private var unreadFiles: Seq[(String, Long)] = _ + /** * Returns the maximum offset that can be retrieved from the source. * @@ -118,15 +120,35 @@ class FileStreamSource( * there is no race here, so the cost of `synchronized` should be rare. */ private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = synchronized { - // All the new files found - ignore aged files and files that we have seen. - val newFiles = fetchAllFiles().filter { - case (path, timestamp) => seenFiles.isNewFile(path, timestamp) + val newFiles = if (unreadFiles != null) { + logDebug(s"Reading from unread files - ${unreadFiles.size} files are available.") + unreadFiles + } else { + // All the new files found - ignore aged files and files that we have seen. + fetchAllFiles().filter { + case (path, timestamp) => seenFiles.isNewFile(path, timestamp) + } } // Obey user's setting to limit the number of files in this batch trigger. - val batchFiles = limit match { - case files: ReadMaxFiles => newFiles.take(files.maxFiles()) - case _: ReadAllAvailable => newFiles + val (batchFiles, unselectedFiles) = limit match { + case files: ReadMaxFiles if !sourceOptions.latestFirst => + // we can cache and reuse remaining fetched list of files in further batches + newFiles.splitAt(files.maxFiles()) + + case files: ReadMaxFiles => + // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch + (newFiles.take(files.maxFiles()), null) + + case _: ReadAllAvailable => (newFiles, null) + } + + if (unselectedFiles != null && unselectedFiles.nonEmpty) { + unreadFiles = unselectedFiles + logTrace(s"${unreadFiles.size} unread files are available for further batches.") + } else { + unreadFiles = null + logTrace(s"No unread file is available for further batches.") } batchFiles.foreach { file => @@ -139,6 +161,7 @@ class FileStreamSource( s""" |Number of new files = ${newFiles.size} |Number of files selected for batch = ${batchFiles.size} + |Number of unread files = ${Option(unreadFiles).map(_.size).getOrElse(0)} |Number of seen files = ${seenFiles.size} |Number of files purged from tracking map = $numPurged """.stripMargin) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index fa320333143e..a7f42c95846b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.io.File import java.net.URI +import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable import scala.util.Random @@ -32,11 +33,11 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.connector.read.streaming.ReadLimit import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap, SourceFileArchiver} import org.apache.spark.sql.execution.streaming.sources.MemorySink import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -997,15 +998,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("when schema inference is turned on, should read partition data") { - def createFile(content: String, src: File, tmp: File): Unit = { - val tempFile = Utils.tempFileWith(new File(tmp, "text")) - val finalFile = new File(src, tempFile.getName) - require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") - require(src.mkdirs(), s"Cannot create $src") - require(src.isDirectory(), s"$src is not a directory") - require(stringToFile(tempFile, content).renameTo(finalFile)) - } - withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") { withTempDirs { case (dir, tmp) => val partitionFooSubDir = new File(dir, "partition=foo") @@ -1602,6 +1594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("do not recheck that files exist during getBatch") { + val scheme = ExistsThrowsExceptionFileSystem.scheme withTempDir { temp => spark.conf.set( s"fs.$scheme.impl", @@ -1935,6 +1928,120 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(expectedDir.exists()) assert(expectedDir.list().exists(_.startsWith(filePrefix))) } + + private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = { + val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl" + val originClassForLocalFileSystem = spark.conf.getOption(optionKey) + try { + spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName) + body + } finally { + originClassForLocalFileSystem match { + case Some(fsClazz) => spark.conf.set(optionKey, fsClazz) + case _ => spark.conf.unset(optionKey) + } + } + } + + test("Caches and leverages unread files") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() + + def verifyBatch( + offset: FileStreamSourceOffset, + expectedBatchId: Long, + inputFiles: Seq[File], + expectedListingCount: Int): Unit = { + val batchId = offset.logOffset + assert(batchId === expectedBatchId) + + val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) + assert(files.forall(_.batchId == batchId)) + + val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath } + val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5) + .map(_.getCanonicalPath) + assert(actualInputFiles === expectedInputFiles) + + assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + var lastModified = 0 + val inputFiles = (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(lastModified) + lastModified += 10000 + f + } + + // 4 batches will be available for 20 input files + (0 to 3).foreach { batchId => + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1) + } + + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + // latestOffset returns the offset for previous batch which means no new batch is presented + assert(3 === offsetBatch.logOffset) + // listing should be performed after the list of unread files are exhausted + assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + + test("Don't cache unread files when latestFirst is true") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + + // provide 20 files in src, with sequential "last modified" to guarantee ordering + var lastModified = 0 + (0 to 19).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(lastModified) + lastModified += 10000 + f + } + + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + + // Even though the first batch doesn't read all available files, since latestFirst is true, + // file stream source will not leverage unread files - next batch will also trigger + // listing files + source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + .asInstanceOf[FileStreamSourceOffset] + assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + + private def createFile(content: String, src: File, tmp: File): File = { + val tempFile = Utils.tempFileWith(new File(tmp, "text")) + val finalFile = new File(src, tempFile.getName) + require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}") + require(src.mkdirs(), s"Cannot create $src") + require(src.isDirectory(), s"$src is not a directory") + require(stringToFile(tempFile, content).renameTo(finalFile)) + finalFile + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { @@ -1961,6 +2068,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest { * `DataSource.resolveRelation`. */ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { + import ExistsThrowsExceptionFileSystem._ + override def getUri: URI = { URI.create(s"$scheme:///") } @@ -1980,3 +2089,23 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { object ExistsThrowsExceptionFileSystem { val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" } + +class CountListingLocalFileSystem extends RawLocalFileSystem { + import CountListingLocalFileSystem._ + + override def getUri: URI = { + URI.create(s"$scheme:///") + } + + override def listStatus(f: Path): Array[FileStatus] = { + val path = f.toUri.getPath + val curVal = pathToNumListStatusCalled.getOrElseUpdate(path, new AtomicLong(0)) + curVal.incrementAndGet() + super.listStatus(f) + } +} + +object CountListingLocalFileSystem { + val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" + val pathToNumListStatusCalled = new mutable.HashMap[String, AtomicLong] +} From 07eed68a03895ac677a740360e2eb0996ab697f6 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 16 Apr 2020 12:45:55 +0900 Subject: [PATCH 2/5] Reflect review comments --- .../sql/streaming/FileStreamSourceSuite.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index a7f42c95846b..decb7160b687 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1973,12 +1973,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) } + CountListingLocalFileSystem.resetCount() + // provide 20 files in src, with sequential "last modified" to guarantee ordering - var lastModified = 0 val inputFiles = (0 to 19).map { idx => val f = createFile(idx.toString, new File(src, idx.toString), tmp) - f.setLastModified(lastModified) - lastModified += 10000 + f.setLastModified(idx * 10000) f } @@ -2008,12 +2008,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + CountListingLocalFileSystem.resetCount() + // provide 20 files in src, with sequential "last modified" to guarantee ordering - var lastModified = 0 (0 to 19).map { idx => val f = createFile(idx.toString, new File(src, idx.toString), tmp) - f.setLastModified(lastModified) - lastModified += 10000 + f.setLastModified(idx * 10000) f } @@ -2098,14 +2098,15 @@ class CountListingLocalFileSystem extends RawLocalFileSystem { } override def listStatus(f: Path): Array[FileStatus] = { - val path = f.toUri.getPath - val curVal = pathToNumListStatusCalled.getOrElseUpdate(path, new AtomicLong(0)) + val curVal = pathToNumListStatusCalled.getOrElseUpdate(f.toUri.getPath, new AtomicLong(0)) curVal.incrementAndGet() super.listStatus(f) } } object CountListingLocalFileSystem { - val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" + val scheme = s"CountListingLocalFileSystem${math.abs(Random.nextInt)}fs" val pathToNumListStatusCalled = new mutable.HashMap[String, AtomicLong] + + def resetCount(): Unit = pathToNumListStatusCalled.clear() } From 57981cd45eed8cc16389468dc790fd27bde18f7d Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 17 Apr 2020 13:41:15 +0900 Subject: [PATCH 3/5] Reflect review comment --- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index decb7160b687..061a9aa16b69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2019,6 +2019,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) .asInstanceOf[FileStreamSourceOffset] + assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled.size) assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) @@ -2027,6 +2028,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // listing files source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) .asInstanceOf[FileStreamSourceOffset] + assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled.size) assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) } From 8251b744d40f4f8744df53d68842894489808c2b Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 18 Apr 2020 00:01:23 +0900 Subject: [PATCH 4/5] Add condition to discard unseen files --- .../streaming/FileStreamSource.scala | 12 +++++++- .../sql/streaming/FileStreamSourceSuite.scala | 29 ++++++++++++------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 1c095b19b87f..c2da80e30e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -134,7 +134,15 @@ class FileStreamSource( val (batchFiles, unselectedFiles) = limit match { case files: ReadMaxFiles if !sourceOptions.latestFirst => // we can cache and reuse remaining fetched list of files in further batches - newFiles.splitAt(files.maxFiles()) + val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) + if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) { + // Discard unselected files if the number of files are smaller than threshold. + // This is to avoid the case when the next batch would have too few files to read + // whereas there're new files available. + (bFiles, null) + } else { + (bFiles, usFiles) + } case files: ReadMaxFiles => // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch @@ -334,6 +342,8 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long + val DISCARD_UNSEEN_FILES_RATIO = 0.2 + case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 061a9aa16b69..27cf8235ff21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1946,7 +1946,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("Caches and leverages unread files") { withCountListingLocalFileSystemAsLocalFileSystem { withThreeTempDirs { case (src, meta, tmp) => - val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5") + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10") val scheme = CountListingLocalFileSystem.scheme val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", StructType(Nil), Seq.empty, meta.getCanonicalPath, options) @@ -1965,7 +1965,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(files.forall(_.batchId == batchId)) val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath } - val expectedInputFiles = inputFiles.slice(batchId.toInt * 5, batchId.toInt * 5 + 5) + val expectedInputFiles = inputFiles.slice(batchId.toInt * 10, batchId.toInt * 10 + 10) .map(_.getCanonicalPath) assert(actualInputFiles === expectedInputFiles) @@ -1975,26 +1975,35 @@ class FileStreamSourceSuite extends FileStreamSourceTest { CountListingLocalFileSystem.resetCount() - // provide 20 files in src, with sequential "last modified" to guarantee ordering - val inputFiles = (0 to 19).map { idx => + // provide 41 files in src, with sequential "last modified" to guarantee ordering + val inputFiles = (0 to 40).map { idx => val f = createFile(idx.toString, new File(src, idx.toString), tmp) f.setLastModified(idx * 10000) f } - // 4 batches will be available for 20 input files + // 4 batches will be available for 40 input files (0 to 3).foreach { batchId => - val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1) } - val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) + // batch 5 will trigger list operation though the batch 4 should have 1 unseen file: + // 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO), + // hence unseen files for batch 4 will be discarded. + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + assert(4 === offsetBatch.logOffset) + assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + + val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] // latestOffset returns the offset for previous batch which means no new batch is presented - assert(3 === offsetBatch.logOffset) + assert(4 === offsetBatch2.logOffset) // listing should be performed after the list of unread files are exhausted - assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled + assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) } } @@ -2019,7 +2028,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) .asInstanceOf[FileStreamSourceOffset] - assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled.size) assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) @@ -2028,7 +2036,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // listing files source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5)) .asInstanceOf[FileStreamSourceOffset] - assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled.size) assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) } From 0e972fc20fb1a77fee90200b26a824bb19a91879 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 29 Jun 2020 15:40:51 +0900 Subject: [PATCH 5/5] Set upper bound of caching (static value for now) --- .../spark/sql/execution/streaming/FileStreamSource.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index c2da80e30e66..77a6b8b41472 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -139,6 +139,7 @@ class FileStreamSource( // Discard unselected files if the number of files are smaller than threshold. // This is to avoid the case when the next batch would have too few files to read // whereas there're new files available. + logTrace(s"Discarding ${usFiles.length} unread files as it's smaller than threshold.") (bFiles, null) } else { (bFiles, usFiles) @@ -152,7 +153,8 @@ class FileStreamSource( } if (unselectedFiles != null && unselectedFiles.nonEmpty) { - unreadFiles = unselectedFiles + logTrace(s"Taking first $MAX_CACHED_UNSEEN_FILES unread files.") + unreadFiles = unselectedFiles.take(MAX_CACHED_UNSEEN_FILES) logTrace(s"${unreadFiles.size} unread files are available for further batches.") } else { unreadFiles = null @@ -343,6 +345,7 @@ object FileStreamSource { type Timestamp = Long val DISCARD_UNSEEN_FILES_RATIO = 0.2 + val MAX_CACHED_UNSEEN_FILES = 10000 case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable