From f9dc1a4f3f12f04f216352d543727ceeac3015c8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 22 Nov 2019 11:25:58 +0900 Subject: [PATCH 1/5] [SPARK-29953][SQL] Don't clean up source files for FileStreamSource if the source path refers to the output dir of FileStreamSink --- .../structured-streaming-programming-guide.md | 2 +- .../streaming/FileStreamSource.scala | 22 ++++- .../sql/streaming/FileStreamSourceSuite.scala | 97 +++++++++++++++---- 3 files changed, 99 insertions(+), 22 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 01679e5defe1..b91b93066d25 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -551,7 +551,7 @@ Here are the details of all the sources in Spark. When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
- NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
+ NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.

For file-format-specific options, see the related methods in DataStreamReader diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 35d486c7c743..1c4c6aaf9024 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -259,6 +259,8 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + private var warnedIgnoringCleanSourceOption: Boolean = false + /** * Informs the source that Spark has completed processing all data for offsets less than or * equal to `end` and will only request offsets greater than `end` in the future. @@ -267,10 +269,22 @@ class FileStreamSource( val logOffset = FileStreamSourceOffset(end).logOffset sourceCleaner.foreach { cleaner => - val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) - val validFileEntities = files.filter(_.batchId == logOffset) - logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") - validFileEntities.foreach(cleaner.clean) + sourceHasMetadata match { + case Some(true) if !warnedIgnoringCleanSourceOption => + logWarning("Ignoring 'cleanSource' option since source path refers to the output" + + " directory of FileStreamSink.") + warnedIgnoringCleanSourceOption = true + + case Some(false) => + val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) + val validFileEntities = files.filter(_.batchId == logOffset) + logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") + validFileEntities.foreach(cleaner.clean) + + case _ => + logWarning("Ignoring 'cleanSource' option since Spark hasn't figured out whether " + + "source path refers to the output directory of FileStreamSink or not.") + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 1ef0ae878ec0..0acb1d59e83c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.util.Progressable import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ @@ -149,6 +150,20 @@ abstract class FileStreamSourceTest } } + case class AddFilesToFileStreamSinkLog( + fs: FileSystem, + srcDir: Path, + sinkLog: FileStreamSinkLog, + batchId: Int)( + pathFilter: Path => Boolean) extends ExternalAction { + override def runAction(): Unit = { + val statuses = fs.listStatus(srcDir, new PathFilter { + override def accept(path: Path): Boolean = pathFilter(path) + }) + sinkLog.add(batchId, statuses.map { s => SinkFileStatus(s) }) + } + } + /** Use `format` and `path` to create FileStreamSource via DataFrameReader */ def createFileStream( format: String, @@ -1617,14 +1632,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } test("remove completed files when remove option is enabled") { - def assertFileIsRemoved(files: Array[String], fileName: String): Unit = { - assert(!files.exists(_.startsWith(fileName))) - } - - def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = { - assert(files.exists(_.startsWith(fileName))) - } - withTempDirs { case (src, tmp) => withSQLConf( SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", @@ -1642,28 +1649,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { CheckAnswer("keep1"), AssertOnQuery("input file removed") { _: StreamExecution => // it doesn't rename any file yet - assertFileIsNotRemoved(src.list(), "keep1") + assertFileIsNotRemoved(src, "keep1") true }, AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"), CheckAnswer("keep1", "keep2"), AssertOnQuery("input file removed") { _: StreamExecution => - val files = src.list() - // it renames input file for first batch, but not for second batch yet - assertFileIsRemoved(files, "keep1") - assertFileIsNotRemoved(files, "ke ep2 %") + assertFileIsRemoved(src, "keep1") + assertFileIsNotRemoved(src, "ke ep2 %") true }, AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"), CheckAnswer("keep1", "keep2", "keep3"), AssertOnQuery("input file renamed") { _: StreamExecution => - val files = src.list() - // it renames input file for second batch, but not third batch yet - assertFileIsRemoved(files, "ke ep2 %") - assertFileIsNotRemoved(files, "keep3") + assertFileIsRemoved(src, "ke ep2 %") + assertFileIsNotRemoved(src, "keep3") true } @@ -1739,6 +1742,58 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + Seq("delete", "archive").foreach { cleanOption => + test(s"skip $cleanOption when source path refers the output dir of FileStreamSink") { + withThreeTempDirs { case (src, tmp, archiveDir) => + withSQLConf( + SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", + // Force deleting the old logs + SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1" + ) { + val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1", + "cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath) + + val fileStream = createFileStream("text", src.getCanonicalPath, options = option) + val filtered = fileStream.filter($"value" contains "keep") + + // create FileStreamSinkLog under source directory + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, + new File(src, FileStreamSink.metadataDir).getCanonicalPath) + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val srcPath = new Path(src.getCanonicalPath) + val fileSystem = srcPath.getFileSystem(hadoopConf) + + // Here we will just check whether the source file is removed or not, as we cover + // functionality test of "archive" in other UT. + testStream(filtered)( + AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"), + AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path => + path.getName.startsWith("keep1") + }, + CheckAnswer("keep1"), + AssertOnQuery("input file removed") { _: StreamExecution => + // it doesn't remove any files for recent batch yet + assertFileIsNotRemoved(src, "keep1") + true + }, + AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"), + AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 1) { path => + path.getName.startsWith("ke ep2 %") + }, + CheckAnswer("keep1", "keep2"), + AssertOnQuery("input file removed") { _: StreamExecution => + // it doesn't remove any file in src since it's the output dir of FileStreamSink + assertFileIsNotRemoved(src, "keep1") + // it doesn't remove any files for recent batch yet + assertFileIsNotRemoved(src, "ke ep2 %") + true + } + ) + } + } + } + } + class FakeFileSystem(scheme: String) extends FileSystem { override def exists(f: Path): Boolean = true @@ -1797,6 +1852,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + private def assertFileIsRemoved(sourceDir: File, fileName: String): Unit = { + assert(!sourceDir.list().exists(_.startsWith(fileName))) + } + + private def assertFileIsNotRemoved(sourceDir: File, fileName: String): Unit = { + assert(sourceDir.list().exists(_.startsWith(fileName))) + } + private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = { assert(sourceDir.exists()) assert(sourceDir.list().exists(_.startsWith(filePrefix))) From 8d6d08b0224a6e98fd96099c52bfed33d6afb733 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 22 Nov 2019 11:35:29 +0900 Subject: [PATCH 2/5] Refine a bit --- .../org/apache/spark/sql/streaming/FileStreamSourceSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 0acb1d59e83c..55c1d8d02260 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -160,7 +160,7 @@ abstract class FileStreamSourceTest val statuses = fs.listStatus(srcDir, new PathFilter { override def accept(path: Path): Boolean = pathFilter(path) }) - sinkLog.add(batchId, statuses.map { s => SinkFileStatus(s) }) + sinkLog.add(batchId, statuses.map(SinkFileStatus(_))) } } From d1ec200bf20c877e78e756350ef4540c8d4ed36f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Mon, 25 Nov 2019 23:12:12 +0900 Subject: [PATCH 3/5] Fix silly mistake --- .../sql/execution/streaming/FileStreamSource.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 1c4c6aaf9024..cc340767e3b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -270,10 +270,12 @@ class FileStreamSource( sourceCleaner.foreach { cleaner => sourceHasMetadata match { - case Some(true) if !warnedIgnoringCleanSourceOption => - logWarning("Ignoring 'cleanSource' option since source path refers to the output" + - " directory of FileStreamSink.") - warnedIgnoringCleanSourceOption = true + case Some(true) => + if (!warnedIgnoringCleanSourceOption) { + logWarning("Ignoring 'cleanSource' option since source path refers to the output" + + " directory of FileStreamSink.") + warnedIgnoringCleanSourceOption = true + } case Some(false) => val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) From d7ded9374656516f21cbfae3957ad813b2e80ddb Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 3 Dec 2019 08:28:31 +0900 Subject: [PATCH 4/5] Reflect review comment --- .../streaming/FileStreamSource.scala | 34 +++++++------------ .../sql/streaming/FileStreamSourceSuite.scala | 24 +++---------- 2 files changed, 18 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index cc340767e3b4..fcbfc8950cc1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -206,6 +206,13 @@ class FileStreamSource( CaseInsensitiveMap(options), None).allFiles() } + private def assertCleanupIsNotSpecified(): Unit = { + if (sourceCleaner.isDefined) { + throw new UnsupportedOperationException("Clean up source files is not supported when" + + " reading from the output directory of FileStreamSink.") + } + } + /** * Returns a list of files found, sorted by their timestamp. */ @@ -217,6 +224,7 @@ class FileStreamSource( case None => if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) { sourceHasMetadata = Some(true) + assertCleanupIsNotSpecified() allFiles = allFilesUsingMetadataLogFileIndex() } else { allFiles = allFilesUsingInMemoryFileIndex() @@ -229,6 +237,7 @@ class FileStreamSource( // `FileStreamSink.hasMetadata` check if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) { sourceHasMetadata = Some(true) + assertCleanupIsNotSpecified() allFiles = allFilesUsingMetadataLogFileIndex() } else { sourceHasMetadata = Some(false) @@ -259,34 +268,17 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" - private var warnedIgnoringCleanSourceOption: Boolean = false - /** * Informs the source that Spark has completed processing all data for offsets less than or * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { val logOffset = FileStreamSourceOffset(end).logOffset - sourceCleaner.foreach { cleaner => - sourceHasMetadata match { - case Some(true) => - if (!warnedIgnoringCleanSourceOption) { - logWarning("Ignoring 'cleanSource' option since source path refers to the output" + - " directory of FileStreamSink.") - warnedIgnoringCleanSourceOption = true - } - - case Some(false) => - val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) - val validFileEntities = files.filter(_.batchId == logOffset) - logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") - validFileEntities.foreach(cleaner.clean) - - case _ => - logWarning("Ignoring 'cleanSource' option since Spark hasn't figured out whether " + - "source path refers to the output directory of FileStreamSink or not.") - } + val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) + val validFileEntities = files.filter(_.batchId == logOffset) + logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") + validFileEntities.foreach(cleaner.clean) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 55c1d8d02260..b8dac13b3842 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1743,7 +1743,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } Seq("delete", "archive").foreach { cleanOption => - test(s"skip $cleanOption when source path refers the output dir of FileStreamSink") { + test(s"Throw UnsupportedOperationException on configuring $cleanOption when source path" + + " refers the output dir of FileStreamSink") { withThreeTempDirs { case (src, tmp, archiveDir) => withSQLConf( SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2", @@ -1770,24 +1771,9 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path => path.getName.startsWith("keep1") }, - CheckAnswer("keep1"), - AssertOnQuery("input file removed") { _: StreamExecution => - // it doesn't remove any files for recent batch yet - assertFileIsNotRemoved(src, "keep1") - true - }, - AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"), - AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 1) { path => - path.getName.startsWith("ke ep2 %") - }, - CheckAnswer("keep1", "keep2"), - AssertOnQuery("input file removed") { _: StreamExecution => - // it doesn't remove any file in src since it's the output dir of FileStreamSink - assertFileIsNotRemoved(src, "keep1") - // it doesn't remove any files for recent batch yet - assertFileIsNotRemoved(src, "ke ep2 %") - true - } + ExpectFailure[UnsupportedOperationException]( + t => assert(t.getMessage.startsWith("Clean up source files is not supported")), + isFatalError = false) ) } } From fcdb9e8a5a78071f4b7d3be285a7647300ba66b6 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 3 Dec 2019 08:36:40 +0900 Subject: [PATCH 5/5] Refine a bit --- .../streaming/FileStreamSource.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index fcbfc8950cc1..f31fb32634a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -206,11 +206,15 @@ class FileStreamSource( CaseInsensitiveMap(options), None).allFiles() } - private def assertCleanupIsNotSpecified(): Unit = { - if (sourceCleaner.isDefined) { - throw new UnsupportedOperationException("Clean up source files is not supported when" + - " reading from the output directory of FileStreamSink.") - } + private def setSourceHasMetadata(newValue: Option[Boolean]): Unit = newValue match { + case Some(true) => + if (sourceCleaner.isDefined) { + throw new UnsupportedOperationException("Clean up source files is not supported when" + + " reading from the output directory of FileStreamSink.") + } + sourceHasMetadata = Some(true) + case _ => + sourceHasMetadata = newValue } /** @@ -223,8 +227,7 @@ class FileStreamSource( sourceHasMetadata match { case None => if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) { - sourceHasMetadata = Some(true) - assertCleanupIsNotSpecified() + setSourceHasMetadata(Some(true)) allFiles = allFilesUsingMetadataLogFileIndex() } else { allFiles = allFilesUsingInMemoryFileIndex() @@ -236,11 +239,10 @@ class FileStreamSource( // metadata log and data files are only generated after the previous // `FileStreamSink.hasMetadata` check if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) { - sourceHasMetadata = Some(true) - assertCleanupIsNotSpecified() + setSourceHasMetadata(Some(true)) allFiles = allFilesUsingMetadataLogFileIndex() } else { - sourceHasMetadata = Some(false) + setSourceHasMetadata(Some(false)) // `allFiles` have already been fetched using InMemoryFileIndex in this round } } @@ -274,6 +276,7 @@ class FileStreamSource( */ override def commit(end: Offset): Unit = { val logOffset = FileStreamSourceOffset(end).logOffset + sourceCleaner.foreach { cleaner => val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) val validFileEntities = files.filter(_.batchId == logOffset)