From 4b99e612e994073e995a337a94db123193af2b5f Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Dec 2019 14:51:17 +0900 Subject: [PATCH 1/4] [SPARK-30281][SS] Consider partitioned/recursive option while verifying archive path on FileStreamSource --- .../structured-streaming-programming-guide.md | 3 +- .../streaming/FileStreamSource.scala | 73 +++++++++++++++---- .../sql/streaming/FileStreamSourceSuite.scala | 26 +++++-- 3 files changed, 80 insertions(+), 22 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index b91b93066d25..894664b4683a 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -548,7 +548,8 @@ Here are the details of all the sources in Spark. "s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" should ensure some condition to ensure archived files are never included as new source files: + The value of "sourceArchiveDir" doesn't match with source pattern in depth, where the depth is minimum of depth on both paths.
Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index f31fb32634a4..260f07557314 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -389,20 +389,63 @@ object FileStreamSource { s"on a different file system than the source files. source path: $sourcePath" + s" / base archive path: $baseArchivePath") - /** - * FileStreamSource reads the files which one of below conditions is met: - * 1) file itself is matched with source path - * 2) parent directory is matched with source path - * - * Checking with glob pattern is costly, so set this requirement to eliminate the cases - * where the archive path can be matched with source path. For example, when file is moved - * to archive directory, destination path will retain input file's path as suffix, so - * destination path can't be matched with source path if archive directory's depth is longer - * than 2, as neither file nor parent directory of destination path can be matched with - * source path. - */ - require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " + - "subdirectories from root directory. e.g. '/data/archive'") + require(!isBaseArchivePathMatchedAgainstSourcePattern, "Base archive path cannot be set to" + + " the path where archived path can possibly match with source pattern. Ensure below: \n" + + "1) prefix of base archive path doesn't match with source pattern in same depth\n" + + "2) prefix of source pattern doesn't match with base archive path in same depth") + } + + private def getAncestorEnsuringDepth(path: Path, depth: Int): Path = { + var newPath = path + while (newPath.depth() > depth) { + newPath = newPath.getParent + } + newPath + } + + private def isBaseArchivePathMatchedAgainstSourcePattern: Boolean = { + // We should disallow end users to set base archive path which path matches against source + // pattern to avoid checking each source file. There're couple of cases which allow + // FileStreamSource to read any depth of subdirectory under the source pattern, so we should + // consider all three cases 1) both has same depth 2) base archive path is longer than source + // pattern 3) source pattern is longer than base archive path. To handle all cases, we take + // min of depth for both paths, and check the match. + + val minDepth = math.min(sourcePath.depth(), baseArchivePath.depth()) + + val sourcePathMinDepth = getAncestorEnsuringDepth(sourcePath, minDepth) + val baseArchivePathMinDepth = getAncestorEnsuringDepth(baseArchivePath, minDepth) + + val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePathMinDepth) + + var matched = true + + // pathToCompare should have same depth as sourceGlobFilters.length + var pathToCompare = baseArchivePathMinDepth + var index = 0 + do { + // GlobFilter only matches against its name, not full path so it's safe to compare + if (!sourceGlobFilters(index).accept(pathToCompare)) { + matched = false + } else { + pathToCompare = pathToCompare.getParent + index += 1 + } + } while (matched && !pathToCompare.isRoot) + + matched + } + + private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { + filters += new GlobFilter(currentPath.getName) + currentPath = currentPath.getParent + } + + filters.toList } override def clean(entry: FileEntry): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b8dac13b3842..2eb875cfbd3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1814,15 +1814,29 @@ class FileStreamSourceSuite extends FileStreamSourceTest { override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError } - test("SourceFileArchiver - base archive path depth <= 2") { + test("SourceFileArchiver - fail when base archive path matches source pattern") { val fakeFileSystem = new FakeFileSystem("fake") - val sourcePatternPath = new Path("/hello*/h{e,f}ll?") - val baseArchiveDirPath = new Path("/hello") - - intercept[IllegalArgumentException] { - new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath) + def assertThrowIllegalArgumentException(sourcePatttern: Path, baseArchivePath: Path): Unit = { + intercept[IllegalArgumentException] { + new SourceFileArchiver(fakeFileSystem, sourcePatttern, fakeFileSystem, baseArchivePath) + } } + + // 1) prefix of base archive path matches source pattern (baseArchiveDirPath has more depths) + val sourcePatternPath = new Path("/hello*/spar?") + val baseArchiveDirPath = new Path("/hello/spark/structured/streaming") + assertThrowIllegalArgumentException(sourcePatternPath, baseArchiveDirPath) + + // 2) prefix of source pattern matches base archive path (source pattern has more depths) + val sourcePatternPath2 = new Path("/hello*/spar?/structured/streaming") + val baseArchiveDirPath2 = new Path("/hello/spark/structured") + assertThrowIllegalArgumentException(sourcePatternPath2, baseArchiveDirPath2) + + // 3) source pattern matches base archive path (both have same depth) + val sourcePatternPath3 = new Path("/hello*/spar?/structured/*") + val baseArchiveDirPath3 = new Path("/hello/spark/structured/streaming") + assertThrowIllegalArgumentException(sourcePatternPath3, baseArchiveDirPath3) } test("SourceFileArchiver - different filesystems between source and archive") { From 59862ba5e54c7d00bcd7f3bcd0e2e4763100bbc1 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Dec 2019 15:50:19 +0900 Subject: [PATCH 2/4] Refine a bit --- docs/structured-streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 894664b4683a..f5ab99a7deeb 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -548,7 +548,7 @@ Here are the details of all the sources in Spark. "s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" should ensure some condition to ensure archived files are never included as new source files: + When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" should ensure some condition to guarantee archived files are never included as new source files: The value of "sourceArchiveDir" doesn't match with source pattern in depth, where the depth is minimum of depth on both paths.
Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
From e779edcd3edec3608456d41caca98f5f7e5884a7 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 17 Dec 2019 15:52:00 +0900 Subject: [PATCH 3/4] Refine again --- .../spark/sql/execution/streaming/FileStreamSource.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 260f07557314..39fb7f87749a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -390,9 +390,9 @@ object FileStreamSource { s" / base archive path: $baseArchivePath") require(!isBaseArchivePathMatchedAgainstSourcePattern, "Base archive path cannot be set to" + - " the path where archived path can possibly match with source pattern. Ensure below: \n" + - "1) prefix of base archive path doesn't match with source pattern in same depth\n" + - "2) prefix of source pattern doesn't match with base archive path in same depth") + " the path where archived path can possibly match with source pattern. Ensure the base " + + "archive path doesn't match with source pattern in depth, where the depth is minimum of" + + " depth on both paths.") } private def getAncestorEnsuringDepth(path: Path, depth: Int): Path = { From be988dfe396ac087295f8d39dc9eaeb6a2143465 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 18 Dec 2019 07:08:07 +0900 Subject: [PATCH 4/4] Refine doc --- docs/structured-streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index f5ab99a7deeb..306d688f7a29 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -548,8 +548,8 @@ Here are the details of all the sources in Spark. "s3a://a/b/c/dataset.txt"
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
- When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" should ensure some condition to guarantee archived files are never included as new source files: - The value of "sourceArchiveDir" doesn't match with source pattern in depth, where the depth is minimum of depth on both paths.
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
+ For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.
Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.