diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 01679e5defe1..b91b93066d25 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -551,7 +551,7 @@ Here are the details of all the sources in Spark.
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
- NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
+ NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
For file-format-specific options, see the related methods in DataStreamReader
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 35d486c7c743..f31fb32634a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -206,6 +206,17 @@ class FileStreamSource(
CaseInsensitiveMap(options), None).allFiles()
}
+ private def setSourceHasMetadata(newValue: Option[Boolean]): Unit = newValue match {
+ case Some(true) =>
+ if (sourceCleaner.isDefined) {
+ throw new UnsupportedOperationException("Clean up source files is not supported when" +
+ " reading from the output directory of FileStreamSink.")
+ }
+ sourceHasMetadata = Some(true)
+ case _ =>
+ sourceHasMetadata = newValue
+ }
+
/**
* Returns a list of files found, sorted by their timestamp.
*/
@@ -216,7 +227,7 @@ class FileStreamSource(
sourceHasMetadata match {
case None =>
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
- sourceHasMetadata = Some(true)
+ setSourceHasMetadata(Some(true))
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
allFiles = allFilesUsingInMemoryFileIndex()
@@ -228,10 +239,10 @@ class FileStreamSource(
// metadata log and data files are only generated after the previous
// `FileStreamSink.hasMetadata` check
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
- sourceHasMetadata = Some(true)
+ setSourceHasMetadata(Some(true))
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
- sourceHasMetadata = Some(false)
+ setSourceHasMetadata(Some(false))
// `allFiles` have already been fetched using InMemoryFileIndex in this round
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 1ef0ae878ec0..b8dac13b3842 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.util.Progressable
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
@@ -149,6 +150,20 @@ abstract class FileStreamSourceTest
}
}
+ case class AddFilesToFileStreamSinkLog(
+ fs: FileSystem,
+ srcDir: Path,
+ sinkLog: FileStreamSinkLog,
+ batchId: Int)(
+ pathFilter: Path => Boolean) extends ExternalAction {
+ override def runAction(): Unit = {
+ val statuses = fs.listStatus(srcDir, new PathFilter {
+ override def accept(path: Path): Boolean = pathFilter(path)
+ })
+ sinkLog.add(batchId, statuses.map(SinkFileStatus(_)))
+ }
+ }
+
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
def createFileStream(
format: String,
@@ -1617,14 +1632,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
test("remove completed files when remove option is enabled") {
- def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
- assert(!files.exists(_.startsWith(fileName)))
- }
-
- def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = {
- assert(files.exists(_.startsWith(fileName)))
- }
-
withTempDirs { case (src, tmp) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
@@ -1642,28 +1649,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("keep1"),
AssertOnQuery("input file removed") { _: StreamExecution =>
// it doesn't rename any file yet
- assertFileIsNotRemoved(src.list(), "keep1")
+ assertFileIsNotRemoved(src, "keep1")
true
},
AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
CheckAnswer("keep1", "keep2"),
AssertOnQuery("input file removed") { _: StreamExecution =>
- val files = src.list()
-
// it renames input file for first batch, but not for second batch yet
- assertFileIsRemoved(files, "keep1")
- assertFileIsNotRemoved(files, "ke ep2 %")
+ assertFileIsRemoved(src, "keep1")
+ assertFileIsNotRemoved(src, "ke ep2 %")
true
},
AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
CheckAnswer("keep1", "keep2", "keep3"),
AssertOnQuery("input file renamed") { _: StreamExecution =>
- val files = src.list()
-
// it renames input file for second batch, but not third batch yet
- assertFileIsRemoved(files, "ke ep2 %")
- assertFileIsNotRemoved(files, "keep3")
+ assertFileIsRemoved(src, "ke ep2 %")
+ assertFileIsNotRemoved(src, "keep3")
true
}
@@ -1739,6 +1742,44 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ Seq("delete", "archive").foreach { cleanOption =>
+ test(s"Throw UnsupportedOperationException on configuring $cleanOption when source path" +
+ " refers the output dir of FileStreamSink") {
+ withThreeTempDirs { case (src, tmp, archiveDir) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+ // Force deleting the old logs
+ SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+ ) {
+ val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+ "cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)
+
+ val fileStream = createFileStream("text", src.getCanonicalPath, options = option)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ // create FileStreamSinkLog under source directory
+ val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
+ new File(src, FileStreamSink.metadataDir).getCanonicalPath)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
+ val srcPath = new Path(src.getCanonicalPath)
+ val fileSystem = srcPath.getFileSystem(hadoopConf)
+
+ // Here we will just check whether the source file is removed or not, as we cover
+ // functionality test of "archive" in other UT.
+ testStream(filtered)(
+ AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+ AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path =>
+ path.getName.startsWith("keep1")
+ },
+ ExpectFailure[UnsupportedOperationException](
+ t => assert(t.getMessage.startsWith("Clean up source files is not supported")),
+ isFatalError = false)
+ )
+ }
+ }
+ }
+ }
+
class FakeFileSystem(scheme: String) extends FileSystem {
override def exists(f: Path): Boolean = true
@@ -1797,6 +1838,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ private def assertFileIsRemoved(sourceDir: File, fileName: String): Unit = {
+ assert(!sourceDir.list().exists(_.startsWith(fileName)))
+ }
+
+ private def assertFileIsNotRemoved(sourceDir: File, fileName: String): Unit = {
+ assert(sourceDir.list().exists(_.startsWith(fileName)))
+ }
+
private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = {
assert(sourceDir.exists())
assert(sourceDir.list().exists(_.startsWith(filePrefix)))