diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index b91b93066d25..306d688f7a29 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -548,7 +548,8 @@ Here are the details of all the sources in Spark.
"s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
+ For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.
Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index f31fb32634a4..39fb7f87749a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
@@ -389,20 +389,63 @@ object FileStreamSource {
s"on a different file system than the source files. source path: $sourcePath" +
s" / base archive path: $baseArchivePath")
- /**
- * FileStreamSource reads the files which one of below conditions is met:
- * 1) file itself is matched with source path
- * 2) parent directory is matched with source path
- *
- * Checking with glob pattern is costly, so set this requirement to eliminate the cases
- * where the archive path can be matched with source path. For example, when file is moved
- * to archive directory, destination path will retain input file's path as suffix, so
- * destination path can't be matched with source path if archive directory's depth is longer
- * than 2, as neither file nor parent directory of destination path can be matched with
- * source path.
- */
- require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " +
- "subdirectories from root directory. e.g. '/data/archive'")
+ require(!isBaseArchivePathMatchedAgainstSourcePattern, "Base archive path cannot be set to" +
+ " the path where archived path can possibly match with source pattern. Ensure the base " +
+ "archive path doesn't match with source pattern in depth, where the depth is minimum of" +
+ " depth on both paths.")
+ }
+
+ private def getAncestorEnsuringDepth(path: Path, depth: Int): Path = {
+ var newPath = path
+ while (newPath.depth() > depth) {
+ newPath = newPath.getParent
+ }
+ newPath
+ }
+
+ private def isBaseArchivePathMatchedAgainstSourcePattern: Boolean = {
+ // We should disallow end users to set base archive path which path matches against source
+ // pattern to avoid checking each source file. There're couple of cases which allow
+ // FileStreamSource to read any depth of subdirectory under the source pattern, so we should
+ // consider all three cases 1) both has same depth 2) base archive path is longer than source
+ // pattern 3) source pattern is longer than base archive path. To handle all cases, we take
+ // min of depth for both paths, and check the match.
+
+ val minDepth = math.min(sourcePath.depth(), baseArchivePath.depth())
+
+ val sourcePathMinDepth = getAncestorEnsuringDepth(sourcePath, minDepth)
+ val baseArchivePathMinDepth = getAncestorEnsuringDepth(baseArchivePath, minDepth)
+
+ val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePathMinDepth)
+
+ var matched = true
+
+ // pathToCompare should have same depth as sourceGlobFilters.length
+ var pathToCompare = baseArchivePathMinDepth
+ var index = 0
+ do {
+ // GlobFilter only matches against its name, not full path so it's safe to compare
+ if (!sourceGlobFilters(index).accept(pathToCompare)) {
+ matched = false
+ } else {
+ pathToCompare = pathToCompare.getParent
+ index += 1
+ }
+ } while (matched && !pathToCompare.isRoot)
+
+ matched
+ }
+
+ private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+ val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+ var currentPath = sourcePath
+ while (!currentPath.isRoot) {
+ filters += new GlobFilter(currentPath.getName)
+ currentPath = currentPath.getParent
+ }
+
+ filters.toList
}
override def clean(entry: FileEntry): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b8dac13b3842..2eb875cfbd3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1814,15 +1814,29 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError
}
- test("SourceFileArchiver - base archive path depth <= 2") {
+ test("SourceFileArchiver - fail when base archive path matches source pattern") {
val fakeFileSystem = new FakeFileSystem("fake")
- val sourcePatternPath = new Path("/hello*/h{e,f}ll?")
- val baseArchiveDirPath = new Path("/hello")
-
- intercept[IllegalArgumentException] {
- new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath)
+ def assertThrowIllegalArgumentException(sourcePatttern: Path, baseArchivePath: Path): Unit = {
+ intercept[IllegalArgumentException] {
+ new SourceFileArchiver(fakeFileSystem, sourcePatttern, fakeFileSystem, baseArchivePath)
+ }
}
+
+ // 1) prefix of base archive path matches source pattern (baseArchiveDirPath has more depths)
+ val sourcePatternPath = new Path("/hello*/spar?")
+ val baseArchiveDirPath = new Path("/hello/spark/structured/streaming")
+ assertThrowIllegalArgumentException(sourcePatternPath, baseArchiveDirPath)
+
+ // 2) prefix of source pattern matches base archive path (source pattern has more depths)
+ val sourcePatternPath2 = new Path("/hello*/spar?/structured/streaming")
+ val baseArchiveDirPath2 = new Path("/hello/spark/structured")
+ assertThrowIllegalArgumentException(sourcePatternPath2, baseArchiveDirPath2)
+
+ // 3) source pattern matches base archive path (both have same depth)
+ val sourcePatternPath3 = new Path("/hello*/spar?/structured/*")
+ val baseArchiveDirPath3 = new Path("/hello/spark/structured/streaming")
+ assertThrowIllegalArgumentException(sourcePatternPath3, baseArchiveDirPath3)
}
test("SourceFileArchiver - different filesystems between source and archive") {