From 19d47cda2dbb6ac9ea9316d2bb5164a00999283a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sat, 20 Aug 2016 11:50:34 +0100 Subject: [PATCH 01/18] SPARK-17159: move filtering of directories and files out of glob/list filters and into filtering of the FileStatus instances returned in the results, so avoiding the need to create FileStatus intances for -This doesn't add overhead to the filtering process; that's done as post-processing in FileSystem anyway. At worst it may result in larger lists being built up and returned. -For every glob match, the code saves 2 RPC calls to the HDFS NN -The code saves 1-3 HTTP calls to S3 for the directory check (including a slow List call whenever the directory has children as it doesn't exist as a blob any more) -for the modtime check of every file, it saves an HTTP GET The whole modtime cache can be eliminated; it's a performance optimisation to avoid the overhead of the file checks, one that is no longer needed. --- .../streaming/dstream/FileInputDStream.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 905b1c52afa6..3d0abe5432ad 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.{RDD, UnionRDD} @@ -196,15 +196,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val newFileFilter = new PathFilter { - def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) - } - val directoryFilter = new PathFilter { - override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory - } - val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) + val directories = fs.globStatus(directoryPath) + .filter(_.isDirectory) + .map(_.getPath) val newFiles = directories.flatMap(dir => - fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) + fs.listStatus(dir) + .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold )) + .map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) @@ -241,8 +239,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). * Hence they can get selected as new files again. To prevent this, files whose mod time is more * than current batch time are not considered. + * @param fs file status + * @param currentTime time of the batch + * @param modTimeIgnoreThreshold the ignore threshold + * @return true if the file has been modified within the batch window */ - private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { + private def isNewFile(fs: FileStatus, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { + val path = fs.getPath val pathStr = path.toString // Reject file if it does not satisfy filter if (!filter(path)) { @@ -250,7 +253,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( return false } // Reject file if it was created before the ignore time - val modTime = getFileModTime(path) + val modTime = getFileModTime(fs) if (modTime <= modTimeIgnoreThreshold) { // Use <= instead of < to avoid SPARK-4518 logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold") @@ -293,8 +296,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( } /** Get file mod time from cache or fetch it from the file system */ - private def getFileModTime(path: Path) = { - fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime()) + private def getFileModTime(fs: FileStatus) = { + fileToModTime.getOrElseUpdate(fs.getPath.toString, fs.getModificationTime()) } private def directoryPath: Path = { From 6f1ea36a9e1970168563f87ebc0ea3b55842a682 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sat, 20 Aug 2016 14:26:14 +0100 Subject: [PATCH 02/18] [SPARK-17159] Remove the fileModTime cache. Now that the modification time costs 0 to evaluate, caching it actually consumes memory and the time for a lookup. --- .../spark/streaming/dstream/FileInputDStream.scala | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 3d0abe5432ad..f041c066c2bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo -import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils} +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * This class represents an input stream that monitors a Hadoop-compatible filesystem for new @@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() - // Read-through cache of file mod times, used to speed up mod time lookups - @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) - // Timestamp of the last round of finding files @transient private var lastNewFileFindingTime = 0L @@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug("Cleared files are:\n" + oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } - // Delete file mod times that weren't accessed in the last round of getting new files - fileToModTime.clearOldValues(lastNewFileFindingTime - 1) } /** @@ -201,11 +196,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( .map(_.getPath) val newFiles = directories.flatMap(dir => fs.listStatus(dir) - .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold )) + .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) .map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") - logDebug("# cached file times = " + fileToModTime.size) if (timeTaken > slideDuration.milliseconds) { logWarning( "Time taken to find new files exceeds the batch size. " + @@ -297,7 +291,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( /** Get file mod time from cache or fetch it from the file system */ private def getFileModTime(fs: FileStatus) = { - fileToModTime.getOrElseUpdate(fs.getPath.toString, fs.getModificationTime()) + fs.getModificationTime() } private def directoryPath: Path = { @@ -321,7 +315,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() - fileToModTime = new TimeStampedHashMap[String, Long](true) } /** From c2a4382f5ed39275a6a47932eb7294faaaa7fbe6 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sat, 20 Aug 2016 21:38:13 +0100 Subject: [PATCH 03/18] [SPARK-17159] inline FileStatus.getModificationTime; address style issues. Also note that 1s granularity is the resolution from HDFS; other filesystems may have a different resolution. The only one I know that is worse is FAT16/FAT32, which is accurate to 2s, but nobody should be using that except on SSD cards and USB sticks --- .../streaming/dstream/FileInputDStream.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index f041c066c2bb..07ddf36e9c84 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -137,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * a union RDD out of them. Note that this maintains the list of files that were processed * in the latest modification time in the previous call to this method. This is because the * modification time returned by the FileStatus API seems to return times only at the - * granularity of seconds. And new files may have the same modification time as the + * granularity of seconds in HDFS. And new files may have the same modification time as the * latest modification time in the previous call to this method yet was not reported in * the previous call. */ @@ -233,13 +233,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( * The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1). * Hence they can get selected as new files again. To prevent this, files whose mod time is more * than current batch time are not considered. - * @param fs file status + * @param fileStatus file status * @param currentTime time of the batch * @param modTimeIgnoreThreshold the ignore threshold * @return true if the file has been modified within the batch window */ - private def isNewFile(fs: FileStatus, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { - val path = fs.getPath + private def isNewFile( + fileStatus: FileStatus, + currentTime: Long, + modTimeIgnoreThreshold: Long): Boolean = { + val path = fileStatus.getPath val pathStr = path.toString // Reject file if it does not satisfy filter if (!filter(path)) { @@ -247,7 +250,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( return false } // Reject file if it was created before the ignore time - val modTime = getFileModTime(fs) + val modTime = fileStatus.getModificationTime() if (modTime <= modTimeIgnoreThreshold) { // Use <= instead of < to avoid SPARK-4518 logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold") @@ -289,11 +292,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( new UnionRDD(context.sparkContext, fileRDDs) } - /** Get file mod time from cache or fetch it from the file system */ - private def getFileModTime(fs: FileStatus) = { - fs.getModificationTime() - } - private def directoryPath: Path = { if (_path == null) _path = new Path(directory) _path From 0c880932a3faec04f2299310465495c524781c92 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 23 Aug 2016 18:45:43 +0100 Subject: [PATCH 04/18] [SPARK-17159] updates as discussed on PR: skip wildcards for non wildcarded listing; handle FNFE specially, add the docs --- docs/streaming-programming-guide.md | 40 +++++++++++++++---- .../streaming/dstream/FileInputDStream.scala | 20 +++++++--- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index abd4ac965360..fd6431b634ad 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -629,13 +629,39 @@ methods for creating DStreams from files as input sources. - Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that - - + The files must have the same data format. - + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into - the data directory. - + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read. - + Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. + + + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). All files directly + underneath this path will be processed as they are discovered. + + A regular expression can be supplied instead, such as + `s3a://bucket/logs/[2015,2016]-??-??-friday`. + Here, the DStream will consist of all files directly under those directories + matching the regular expression. + That is: it is a pattern of directories, not of files in directories. + + All files must be in the same data format. + + All files must be created in/moved under the `dataDirectory` directory/directories by + an atomic operation. In HDFS and similar filesystems, this can be done *renaming* them + into the data directory from another part of the same filesystem. + * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016*`, + renaming an entire directory to match the path will add the directory to the list of matching + directories. However, unless the modification time of the individual files are within + that of the current window, they will not be recognized as new files. + + Once read, changes to the files will not be picked up. + So if the files are being continuously appended, the new data will not be read. + + Special points for object stores + + + Wildcard lookup may be very slow with some object stores. + + File renaming is slow; it is `O(data)`. + + Directory rename is even slower and not atomic. + + Objects created directly though a single PUT operation are atomic, irrespective of + the language or library used. + + Writing a file to an object store using the Hadoop APIs is atomic; the object is only + created via a PUT operation in the final `OutputStream.close()` call. + + For this reason, applications using an object store as the direct destination of data + can consider using PUT operations to directly publish data for a DStream to pick up. + For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 07ddf36e9c84..0dd62df72a1f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream} +import java.io.{FileNotFoundException, IOException, ObjectInputStream} import scala.collection.mutable import scala.reflect.ClassTag @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo @@ -191,9 +192,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val directories = fs.globStatus(directoryPath) - .filter(_.isDirectory) - .map(_.getPath) + val directories = if (SparkHadoopUtil.get.isGlobPath(directoryPath)) { + fs.globStatus(directoryPath) + .filter(_.isDirectory) + .map(_.getPath) + } else { + List(directoryPath).toArray + } val newFiles = directories.flatMap(dir => fs.listStatus(dir) .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) @@ -202,15 +207,18 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logInfo("Finding new files took " + timeTaken + " ms") if (timeTaken > slideDuration.milliseconds) { logWarning( - "Time taken to find new files exceeds the batch size. " + + s"Time taken to find new files $timeTaken exceeds the batch size. " + "Consider increasing the batch size or reducing the number of " + "files in the monitored directory." ) } newFiles } catch { + case e: FileNotFoundException => + logWarning(s"No directory to scan: $directoryPath") + Array.empty case e: Exception => - logWarning("Error finding new files", e) + logWarning(s"Error finding new files under $directoryPath", e) reset() Array.empty } From 36c588192c480090810959ccee6de1a254a84d1a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 Aug 2016 19:06:36 +0100 Subject: [PATCH 05/18] [SPARK-17159] move glob operation into SparkHadoopUtils, alongside an existing/similar method. Add tests for the behaviour. Update docs with suggested fixes, and review/edit. --- .../apache/spark/deploy/SparkHadoopUtil.scala | 27 ++- .../spark/deploy/SparkHadoopUtilSuite.scala | 181 ++++++++++++++++++ docs/streaming-programming-guide.md | 31 +-- .../streaming/dstream/FileInputDStream.scala | 16 +- 4 files changed, 231 insertions(+), 24 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f475ce87540a..24d00094501a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.IOException +import java.io.{FileNotFoundException, IOException} import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -219,6 +219,31 @@ class SparkHadoopUtil extends Logging { if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } + /** + * List directories/files matching the path and return the `FileStatus` results. + * If the pattern is not a regexp then a simple `getFileStatus(pattern)` + * is called to get the status of that path. + * If the path/pattern does not match anything in the filesystem, + * an empty sequence is returned. + * @param pattern pattern + * @return a possibly empty array of FileStatus entries + */ + def globToFileStatus(pattern: Path): Array[FileStatus] = { + val fs = pattern.getFileSystem(conf) + if (isGlobPath(pattern)) { + Option(fs.globStatus(pattern)).getOrElse(Array.empty[FileStatus]) + } else { + try { + Array(fs.getFileStatus(pattern)) + } catch { + // nothing at the end of the path + case e: FileNotFoundException => + logDebug(s"Failed to glob $pattern", e) + Array.empty[FileStatus] + } + } + } + /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala new file mode 100644 index 000000000000..a121c21e5816 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.{File, FileOutputStream} + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.scalatest.{BeforeAndAfter, Matchers} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Tests for `SparkHadoopUtil` + */ +class SparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging +with BeforeAndAfter { + + val sparkConf = new SparkConf() + val hadoopUtils = new SparkHadoopUtil() + val tempDir = Utils.createTempDir("SparkHadoopUtilSuite") + + before { + tempDir.mkdirs() + } + + after { + Utils.deleteRecursively(tempDir) + } + + test("GlobNoFileSimple") { + val file = tempFile("does-not-exist") + assertEmptyGlob(toPath(file)) + } + + test("GlobNoFileWildcard") { + val file = tempFile("does-not-exist") + val path = toPath(file) + assertEmptyGlob(new Path(path, "*")) + } + + /** + * Glob a simple file and expect to get the path back + */ + test("GlobSimpleFile") { + val name = "simplefile" + val file = tempFile(name) + val path = toPath(file) + touch(file) + globToSize(path, 1)(0).getPath should be(path) + } + + /** + * Glob a simple file + wildcard and expect nothing back + */ + test("GlobSimpleFileWildcard") { + val file = tempFile("simplefile") + touch(file) + assertEmptyGlob(toWildcardPath(file)) + } + + /** + * Glob an empty dir and expect to get the directory back + */ + test("GlobEmptyDir") { + val file = tempFile("emptydir") + val path = toPath(file) + file.mkdirs() + globToSize(path, 1)(0).getPath should be(path) + } + + /** + * Glob an empty dir + wildcard and expect nothing back + */ + test("GlobEmptyDirWildcard") { + val file = tempFile("emptydir") + file.mkdirs() + assertEmptyGlob(toWildcardPath(file)) + } + + /** + * Glob a directory with children and expect to only get the directory back + */ + test("GlobNonEmptyDir") { + val file = tempFile("dir") + val path = toPath(file) + file.mkdirs() + val child = new File(file, "child") + touch(child) + globToSize(path, 1)(0).getPath should be(path) + } + + /** + * Glob a non empty dir + wildcard and expect to get the child back + */ + test("GlobNonEmptyDirWildcard") { + val file = tempFile("dir") + file.mkdirs() + val path = toPath(file) + file.mkdirs() + val child = new File(file, "child") + touch(child) + globToSize(toWildcardPath(file), 1)(0).getPath should be(toPath(child)) + } + + /** + * Assert that the glob returned an empty list + * @param pattern pattern to glob + */ + def assertEmptyGlob(pattern: Path): Unit = { + assert(Array.empty[FileStatus] === hadoopUtils.globToFileStatus(pattern), "" + + s"globToFileStatus($pattern)") + } + + /** + * glob to an expected size of returned array + * @param pattern pattern to glob + * @param size size to expect + * @return a list of results + */ + def globToSize(pattern: Path, size: Int): Array[FileStatus] = { + val result = hadoopUtils.globToFileStatus(pattern) + assert(size === result.length, + s"globToFileStatus($pattern) = $result") + result + } + + /** + * Create a 0-byte file at the given path + * @param file file to create + */ + def touch(file: File): Unit = { + file.getParentFile.mkdirs() + new FileOutputStream(file, false).close() + } + + /** + * Convert a file to a path + * @param file file + * @return the path equivalent. + */ + def toPath(file: File): Path = { + new Path(file.toURI) + } + + /** + * Create a wildcard matching all children of the given path + * @param file file + * @return a path + */ + def toWildcardPath(file: File): Path = { + new Path(toPath(file), "*") + } + + /** + * Get a File instance for a filename in the temp directory. The + * path returned is not created. + * @param name filename + * @return an absolute file under the temporary directory + */ + def tempFile(name: String): File = { + new File(tempDir, name).getAbsoluteFile + } + +} diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index fd6431b634ad..0e7e5451af04 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -631,33 +631,38 @@ methods for creating DStreams from files as input sources. Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. - + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). All files directly - underneath this path will be processed as they are discovered. + + A simple directory can be supplied, such as `hdfs://namenode:8040/logs/`. + All files directly such a path will be processed as they are discovered. + A regular expression can be supplied instead, such as - `s3a://bucket/logs/[2015,2016]-??-??-friday`. + `hdfs://namenode:8040/logs/(2015,2016)-*-friday`. Here, the DStream will consist of all files directly under those directories matching the regular expression. That is: it is a pattern of directories, not of files in directories. + All files must be in the same data format. - + All files must be created in/moved under the `dataDirectory` directory/directories by + * A file is considered part of a time period based on its modification time + —not its creation time. + + Files must be created in/moved under the `dataDirectory` directory/directories by an atomic operation. In HDFS and similar filesystems, this can be done *renaming* them into the data directory from another part of the same filesystem. * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016*`, - renaming an entire directory to match the path will add the directory to the list of matching - directories. However, unless the modification time of the individual files are within - that of the current window, they will not be recognized as new files. - + Once read, changes to the files will not be picked up. - So if the files are being continuously appended, the new data will not be read. + renaming an entire directory to match the path will add the directory to the list of + monitored directories. However, unless the modification time of the directory's files + are within that of the current window, they will not be recognized as new files. + + Once processed, changes to a file will not cause the file to be reread. + That is: Updates are ignored. + + The more files under a directory/wildcard pattern, the longer it will take to + scan for changes —even if no files have actually changed. Special points for object stores - + Wildcard lookup may be very slow with some object stores. + + Wildcard directory enumeration may be very slow with some object stores. + + The slow-down from having many files to scan for changes is very significant. + File renaming is slow; it is `O(data)`. + Directory rename is even slower and not atomic. + Objects created directly though a single PUT operation are atomic, irrespective of - the language or library used. - + Writing a file to an object store using the Hadoop APIs is atomic; the object is only - created via a PUT operation in the final `OutputStream.close()` call. + the programming language or library used to upload the file. + + Writing a file to an object store using Hadoop's APIs is an atomic operation; + the object is only created via a PUT operation in the final `OutputStream.close()` call. For this reason, applications using an object store as the direct destination of data can consider using PUT operations to directly publish data for a DStream to pick up. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 0dd62df72a1f..65e92b796276 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -192,30 +192,26 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val directories = if (SparkHadoopUtil.get.isGlobPath(directoryPath)) { - fs.globStatus(directoryPath) - .filter(_.isDirectory) - .map(_.getPath) - } else { - List(directoryPath).toArray - } + val directories = SparkHadoopUtil.get.globToFileStatus(directoryPath) + .filter(_.isDirectory) + .map(_.getPath) val newFiles = directories.flatMap(dir => fs.listStatus(dir) .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) .map(_.getPath.toString)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime - logInfo("Finding new files took " + timeTaken + " ms") + logInfo(s"Finding new files took $timeTaken ms") if (timeTaken > slideDuration.milliseconds) { logWarning( s"Time taken to find new files $timeTaken exceeds the batch size. " + "Consider increasing the batch size or reducing the number of " + - "files in the monitored directory." + "files in the monitored directories." ) } newFiles } catch { case e: FileNotFoundException => - logWarning(s"No directory to scan: $directoryPath") + logWarning(s"No directory to scan: $directoryPath: $e") Array.empty case e: Exception => logWarning(s"Error finding new files under $directoryPath", e) From a69d1b683a3035224a0f925d0845e776279bcc37 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 24 Aug 2016 19:33:26 +0100 Subject: [PATCH 06/18] [SPARK-17159] use a simpler pattern in the example docs --- docs/streaming-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 0e7e5451af04..d9d8ff5436dd 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -634,7 +634,7 @@ methods for creating DStreams from files as input sources. + A simple directory can be supplied, such as `hdfs://namenode:8040/logs/`. All files directly such a path will be processed as they are discovered. + A regular expression can be supplied instead, such as - `hdfs://namenode:8040/logs/(2015,2016)-*-friday`. + `hdfs://namenode:8040/logs/2016-*-31`. Here, the DStream will consist of all files directly under those directories matching the regular expression. That is: it is a pattern of directories, not of files in directories. From f8c95212e219e6d6baacb0a680c8eb426e40cf01 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 26 Aug 2016 15:41:20 +0100 Subject: [PATCH 07/18] [SPARK-17159] add directory rename test (taken from SPARK-7481 examples and made more robust) --- .../spark/streaming/InputStreamsSuite.scala | 80 ++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b5d36a36513a..f11c31465185 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -27,7 +27,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import com.google.common.io.Files -import org.apache.hadoop.fs.Path +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.scalatest.BeforeAndAfter @@ -251,6 +252,83 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } + /** + * Tests that renamed directories are included in new batches -but that only files created + * within the batch window are included. + * Uses the Hadoop APIs to verify consistent behavior with the operations used internally. + */ + test("renamed directories are scanned") { + val testDir = Utils.createTempDir() + try { + val batchDuration = Seconds(2) + val durationMs = batchDuration.milliseconds + val testPath = new Path(testDir.toURI) + val streamDir = new Path(testPath, "streaming") + val streamGlobPath = new Path(streamDir, "sub*") + val generatedDir = new Path(testPath, "generated"); + val generatedSubDir = new Path(generatedDir, "subdir"); + val renamedSubDir = new Path(streamDir, "subdir") + + withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + val sparkContext = ssc.sparkContext + val hc = sparkContext.hadoopConfiguration + val fs = FileSystem.get(testPath.toUri, hc) + + fs.delete(testPath, true) + fs.mkdirs(testPath) + fs.mkdirs(streamDir) + fs.mkdirs(generatedSubDir) + + def write(path: Path, text: String) = { + val out = fs.create(path, true) + IOUtils.write(text, out) + out.close() + } + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val existingFile = new Path(generatedSubDir, "existing"); + write(existingFile, "existing\n") + val status = fs.getFileStatus(existingFile) + clock.setTime(status.getModificationTime + durationMs) + val batchCounter = new BatchCounter(ssc) + val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString) + val outputQueue = new ConcurrentLinkedQueue[Seq[String]] + val outputStream = new TestOutputStream(fileStream, outputQueue) + outputStream.register() + + ssc.start() + clock.advance(durationMs) + eventually(eventuallyTimeout) { + assert(1 === batchCounter.getNumCompletedBatches) + } + // create and rename the file + // put a file into the generated directory + val textPath = new Path(generatedSubDir, "renamed.txt") + write(textPath, "renamed\n") + val now = clock.getTimeMillis() + val modTime = now + durationMs / 2 + fs.setTimes(textPath, modTime, modTime) + val textFilestatus = fs.getFileStatus(existingFile) + assert(textFilestatus.getModificationTime < now + durationMs) + + // rename the directory under the path being scanned + fs.rename(generatedSubDir, renamedSubDir) + + // move forward one window + clock.advance(durationMs) + // await the next scan completing + eventually(eventuallyTimeout) { + assert(2 === batchCounter.getNumCompletedBatches) + } + // verify that the "renamed" file is found, but not the "existing" one which is out of + // the window + assert(Set("renamed") === outputQueue.asScala.flatten.toSet) + } + } finally { + Utils.deleteRecursively(testDir) + } + } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 From 4f0172167b2dc649384a56cc65e4e432a479556b Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 26 Aug 2016 16:27:20 +0100 Subject: [PATCH 08/18] [SPARK-17159] method nested inside a sparktest test closure being mistaken for a public method and so needing to declare a return type. --- .../scala/org/apache/spark/streaming/InputStreamsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index f11c31465185..25a6a07d3a88 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -279,7 +279,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { fs.mkdirs(streamDir) fs.mkdirs(generatedSubDir) - def write(path: Path, text: String) = { + def write(path: Path, text: String): Unit = { val out = fs.create(path, true) IOUtils.write(text, out) out.close() From 1b2027cee5b2a35dfa605f5786f79e1c4d441209 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Sat, 27 Aug 2016 15:01:48 +0100 Subject: [PATCH 09/18] [SPARK-17159] method -> globToFileStatusIfNecessary --- .../apache/spark/deploy/SparkHadoopUtil.scala | 4 +-- .../spark/deploy/SparkHadoopUtilSuite.scala | 34 +++++++++---------- .../streaming/dstream/FileInputDStream.scala | 2 +- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 24d00094501a..3a67d293413a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -221,14 +221,14 @@ class SparkHadoopUtil extends Logging { /** * List directories/files matching the path and return the `FileStatus` results. - * If the pattern is not a regexp then a simple `getFileStatus(pattern)` + * If the pattern is not a glob regular expression then a simple `getFileStatus(pattern)` * is called to get the status of that path. * If the path/pattern does not match anything in the filesystem, * an empty sequence is returned. * @param pattern pattern * @return a possibly empty array of FileStatus entries */ - def globToFileStatus(pattern: Path): Array[FileStatus] = { + def globToFileStatusIfNecessary(pattern: Path): Array[FileStatus] = { val fs = pattern.getFileSystem(conf) if (isGlobPath(pattern)) { Option(fs.globStatus(pattern)).getOrElse(Array.empty[FileStatus]) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala index a121c21e5816..e1b4eb72875d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils /** - * Tests for `SparkHadoopUtil` + * Tests for `SparkHadoopUtil` methods. */ class SparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging with BeforeAndAfter { @@ -56,7 +56,7 @@ with BeforeAndAfter { } /** - * Glob a simple file and expect to get the path back + * Glob a simple file and expect to get the path back. */ test("GlobSimpleFile") { val name = "simplefile" @@ -67,7 +67,7 @@ with BeforeAndAfter { } /** - * Glob a simple file + wildcard and expect nothing back + * Glob a simple file + wildcard and expect nothing back. */ test("GlobSimpleFileWildcard") { val file = tempFile("simplefile") @@ -76,7 +76,7 @@ with BeforeAndAfter { } /** - * Glob an empty dir and expect to get the directory back + * Glob an empty dir and expect to get the directory back. */ test("GlobEmptyDir") { val file = tempFile("emptydir") @@ -86,7 +86,7 @@ with BeforeAndAfter { } /** - * Glob an empty dir + wildcard and expect nothing back + * Glob an empty dir + wildcard and expect nothing back. */ test("GlobEmptyDirWildcard") { val file = tempFile("emptydir") @@ -95,7 +95,7 @@ with BeforeAndAfter { } /** - * Glob a directory with children and expect to only get the directory back + * Glob a directory with children and expect to only get the directory back. */ test("GlobNonEmptyDir") { val file = tempFile("dir") @@ -107,7 +107,7 @@ with BeforeAndAfter { } /** - * Glob a non empty dir + wildcard and expect to get the child back + * Glob a non empty dir + wildcard and expect to get the child back. */ test("GlobNonEmptyDirWildcard") { val file = tempFile("dir") @@ -120,29 +120,29 @@ with BeforeAndAfter { } /** - * Assert that the glob returned an empty list + * Assert that the glob returned an empty list. * @param pattern pattern to glob */ def assertEmptyGlob(pattern: Path): Unit = { - assert(Array.empty[FileStatus] === hadoopUtils.globToFileStatus(pattern), "" + - s"globToFileStatus($pattern)") + assert(Array.empty[FileStatus] === hadoopUtils.globToFileStatusIfNecessary(pattern), + s"globToFileStatus($pattern)") } /** - * glob to an expected size of returned array + * glob to an expected size of returned array. * @param pattern pattern to glob * @param size size to expect * @return a list of results */ def globToSize(pattern: Path, size: Int): Array[FileStatus] = { - val result = hadoopUtils.globToFileStatus(pattern) + val result = hadoopUtils.globToFileStatusIfNecessary(pattern) assert(size === result.length, s"globToFileStatus($pattern) = $result") result } /** - * Create a 0-byte file at the given path + * Create a 0-byte file at the given path. * @param file file to create */ def touch(file: File): Unit = { @@ -151,7 +151,7 @@ with BeforeAndAfter { } /** - * Convert a file to a path + * Convert a file to a path. * @param file file * @return the path equivalent. */ @@ -160,7 +160,7 @@ with BeforeAndAfter { } /** - * Create a wildcard matching all children of the given path + * Create a wildcard matching all children of the given path. * @param file file * @return a path */ @@ -169,8 +169,8 @@ with BeforeAndAfter { } /** - * Get a File instance for a filename in the temp directory. The - * path returned is not created. + * Get a File instance for a filename in the temp directory. + * No file is created. * @param name filename * @return an absolute file under the temporary directory */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 65e92b796276..fcd92904785e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -192,7 +192,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val directories = SparkHadoopUtil.get.globToFileStatus(directoryPath) + val directories = SparkHadoopUtil.get.globToFileStatusIfNecessary(directoryPath) .filter(_.isDirectory) .map(_.getPath) val newFiles = directories.flatMap(dir => From 921c5c232ec1942238b45896650a8a4781396525 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 30 Aug 2016 18:09:10 +0100 Subject: [PATCH 10/18] [SPARK-17159] File input dstream: revert to directory list operation which doesn't shortcut on a non-wildcard operation --- .../apache/spark/deploy/SparkHadoopUtil.scala | 26 +-- .../spark/deploy/SparkHadoopUtilSuite.scala | 181 ------------------ .../streaming/dstream/FileInputDStream.scala | 3 +- 3 files changed, 2 insertions(+), 208 deletions(-) delete mode 100644 core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 3a67d293413a..7f66711e8318 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.io.{FileNotFoundException, IOException} +import java.lang.reflect.Method import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -219,31 +220,6 @@ class SparkHadoopUtil extends Logging { if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } - /** - * List directories/files matching the path and return the `FileStatus` results. - * If the pattern is not a glob regular expression then a simple `getFileStatus(pattern)` - * is called to get the status of that path. - * If the path/pattern does not match anything in the filesystem, - * an empty sequence is returned. - * @param pattern pattern - * @return a possibly empty array of FileStatus entries - */ - def globToFileStatusIfNecessary(pattern: Path): Array[FileStatus] = { - val fs = pattern.getFileSystem(conf) - if (isGlobPath(pattern)) { - Option(fs.globStatus(pattern)).getOrElse(Array.empty[FileStatus]) - } else { - try { - Array(fs.getFileStatus(pattern)) - } catch { - // nothing at the end of the path - case e: FileNotFoundException => - logDebug(s"Failed to glob $pattern", e) - Array.empty[FileStatus] - } - } - } - /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala deleted file mode 100644 index e1b4eb72875d..000000000000 --- a/core/src/test/scala/org/apache/spark/deploy/SparkHadoopUtilSuite.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy - -import java.io.{File, FileOutputStream} - -import org.apache.hadoop.fs.{FileStatus, Path} -import org.scalatest.{BeforeAndAfter, Matchers} - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils - -/** - * Tests for `SparkHadoopUtil` methods. - */ -class SparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging -with BeforeAndAfter { - - val sparkConf = new SparkConf() - val hadoopUtils = new SparkHadoopUtil() - val tempDir = Utils.createTempDir("SparkHadoopUtilSuite") - - before { - tempDir.mkdirs() - } - - after { - Utils.deleteRecursively(tempDir) - } - - test("GlobNoFileSimple") { - val file = tempFile("does-not-exist") - assertEmptyGlob(toPath(file)) - } - - test("GlobNoFileWildcard") { - val file = tempFile("does-not-exist") - val path = toPath(file) - assertEmptyGlob(new Path(path, "*")) - } - - /** - * Glob a simple file and expect to get the path back. - */ - test("GlobSimpleFile") { - val name = "simplefile" - val file = tempFile(name) - val path = toPath(file) - touch(file) - globToSize(path, 1)(0).getPath should be(path) - } - - /** - * Glob a simple file + wildcard and expect nothing back. - */ - test("GlobSimpleFileWildcard") { - val file = tempFile("simplefile") - touch(file) - assertEmptyGlob(toWildcardPath(file)) - } - - /** - * Glob an empty dir and expect to get the directory back. - */ - test("GlobEmptyDir") { - val file = tempFile("emptydir") - val path = toPath(file) - file.mkdirs() - globToSize(path, 1)(0).getPath should be(path) - } - - /** - * Glob an empty dir + wildcard and expect nothing back. - */ - test("GlobEmptyDirWildcard") { - val file = tempFile("emptydir") - file.mkdirs() - assertEmptyGlob(toWildcardPath(file)) - } - - /** - * Glob a directory with children and expect to only get the directory back. - */ - test("GlobNonEmptyDir") { - val file = tempFile("dir") - val path = toPath(file) - file.mkdirs() - val child = new File(file, "child") - touch(child) - globToSize(path, 1)(0).getPath should be(path) - } - - /** - * Glob a non empty dir + wildcard and expect to get the child back. - */ - test("GlobNonEmptyDirWildcard") { - val file = tempFile("dir") - file.mkdirs() - val path = toPath(file) - file.mkdirs() - val child = new File(file, "child") - touch(child) - globToSize(toWildcardPath(file), 1)(0).getPath should be(toPath(child)) - } - - /** - * Assert that the glob returned an empty list. - * @param pattern pattern to glob - */ - def assertEmptyGlob(pattern: Path): Unit = { - assert(Array.empty[FileStatus] === hadoopUtils.globToFileStatusIfNecessary(pattern), - s"globToFileStatus($pattern)") - } - - /** - * glob to an expected size of returned array. - * @param pattern pattern to glob - * @param size size to expect - * @return a list of results - */ - def globToSize(pattern: Path, size: Int): Array[FileStatus] = { - val result = hadoopUtils.globToFileStatusIfNecessary(pattern) - assert(size === result.length, - s"globToFileStatus($pattern) = $result") - result - } - - /** - * Create a 0-byte file at the given path. - * @param file file to create - */ - def touch(file: File): Unit = { - file.getParentFile.mkdirs() - new FileOutputStream(file, false).close() - } - - /** - * Convert a file to a path. - * @param file file - * @return the path equivalent. - */ - def toPath(file: File): Path = { - new Path(file.toURI) - } - - /** - * Create a wildcard matching all children of the given path. - * @param file file - * @return a path - */ - def toWildcardPath(file: File): Path = { - new Path(toPath(file), "*") - } - - /** - * Get a File instance for a filename in the temp directory. - * No file is created. - * @param name filename - * @return an absolute file under the temporary directory - */ - def tempFile(name: String): File = { - new File(tempDir, name).getAbsoluteFile - } - -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fcd92904785e..a5f3d4e23d2b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo @@ -192,7 +191,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val directories = SparkHadoopUtil.get.globToFileStatusIfNecessary(directoryPath) + val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) .filter(_.isDirectory) .map(_.getPath) val newFiles = directories.flatMap(dir => From bff0d13258bd95bdea9f8e034f901221d1cc6bdf Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 16 Sep 2016 17:56:05 +0100 Subject: [PATCH 11/18] [SPARK-17159] round out the file streaming text with the dirty details of how HDFS doesn't update file length or modtime until close or a block boundary is reached. --- docs/streaming-programming-guide.md | 52 +++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index d9d8ff5436dd..f71c3308dfe5 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -633,8 +633,8 @@ methods for creating DStreams from files as input sources. + A simple directory can be supplied, such as `hdfs://namenode:8040/logs/`. All files directly such a path will be processed as they are discovered. - + A regular expression can be supplied instead, such as - `hdfs://namenode:8040/logs/2016-*-31`. + + A POSIX glob pattern can be supplied, such as + `hdfs://namenode:8040/logs/2016-??-31`. Here, the DStream will consist of all files directly under those directories matching the regular expression. That is: it is a pattern of directories, not of files in directories. @@ -644,29 +644,55 @@ methods for creating DStreams from files as input sources. + Files must be created in/moved under the `dataDirectory` directory/directories by an atomic operation. In HDFS and similar filesystems, this can be done *renaming* them into the data directory from another part of the same filesystem. - * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016*`, + * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016-*`, renaming an entire directory to match the path will add the directory to the list of - monitored directories. However, unless the modification time of the directory's files + monitored directories. Unless the modification time of the directory's files are within that of the current window, they will not be recognized as new files. - + Once processed, changes to a file will not cause the file to be reread. + + Once processed, changes to a file within the current window will not cause the file to be reread. That is: Updates are ignored. + The more files under a directory/wildcard pattern, the longer it will take to scan for changes —even if no files have actually changed. + + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have the file picked + up in a later window, even if its contents have not changed. - Special points for object stores + Special points for HDFS + + For performance and scalability, the HDFS filesystem does not continually update the modification + time or even the listed filesize of a file while it is being written to. The algorithm is + + + File creation: a zero-byte file is listed, creation and modification time is set to the current + time as seen on the NameNode. + * File writes: data is buffered, nothing is written until local buffer is flushed + * `OutputStream.flush()` call or output buffer full: data is written to HDFS DataNodes. The + file length and modification time is only updated when the amount of data being written + crosses an HDFS block (64, 128, 256MB or similar). + * When `OutputStream.close()` is called, remaining data is written, the file closed and + the NameNode updated with the final size of the file. The modification time is set to + the time the file was closed. + + This means that is near-impossible to determine when a file being actively written to HDFS will + have its modification time updated and so changed detected; it will depend on the size of + the file being written. To guarantee that changes are picked up in a window, write the file + to a directory which is not being monitored, then, after the file is closed, `rename()` it into + the destination directory. + + Object stores + + Object stores have a different set of limitations. - + Wildcard directory enumeration may be very slow with some object stores. + + Wildcard directory enumeration may be slow. + The slow-down from having many files to scan for changes is very significant. - + File renaming is slow; it is `O(data)`. - + Directory rename is even slower and not atomic. - + Objects created directly though a single PUT operation are atomic, irrespective of - the programming language or library used to upload the file. + + Renaming directories and files can be very slow. + + Object creation directly though a PUT operation is atomic, irrespective of + the programming language or library used to upload the data. + + The `FileSystem.setTimes()` command to set file timestamps will be ignored. + Writing a file to an object store using Hadoop's APIs is an atomic operation; the object is only created via a PUT operation in the final `OutputStream.close()` call. For this reason, applications using an object store as the direct destination of data - can consider using PUT operations to directly publish data for a DStream to pick up. - + can use PUT operations to directly publish data for a DStream to pick up —even though this + is not the mechanism to use when writing to HDFS or other filesystem. + For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. From a67902bed07c935c14c2d70654bb18f1c53740be Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 19 Sep 2016 10:43:54 +0100 Subject: [PATCH 12/18] SPARK-17159 Chris Nauroth of HDFS team clarified which operations update the mtime field; this is covered in the streaming section to emphasise why write + rename is the strategy for streaming in files in HDFS. That strategy does also work in object stores, though the rename operation is O(data) --- docs/streaming-programming-guide.md | 46 ++++++++++++++++++----------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index f71c3308dfe5..d9ff1b9cea74 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -657,22 +657,29 @@ methods for creating DStreams from files as input sources. Special points for HDFS - For performance and scalability, the HDFS filesystem does not continually update the modification - time or even the listed filesize of a file while it is being written to. The algorithm is + For performance and scalability, the HDFS filesystem does not update the modification + time while it is being written to. Specifically - + File creation: a zero-byte file is listed, creation and modification time is set to the current - time as seen on the NameNode. - * File writes: data is buffered, nothing is written until local buffer is flushed - * `OutputStream.flush()` call or output buffer full: data is written to HDFS DataNodes. The - file length and modification time is only updated when the amount of data being written - crosses an HDFS block (64, 128, 256MB or similar). + + `FileSystem.create()` creation: a zero-byte file is listed; creation and modification time is + set to the current time as seen on the NameNode. + * Writes to a file via the output stream returned in the `create()` call: the modification + time *does not change*. * When `OutputStream.close()` is called, remaining data is written, the file closed and the NameNode updated with the final size of the file. The modification time is set to the time the file was closed. - - This means that is near-impossible to determine when a file being actively written to HDFS will - have its modification time updated and so changed detected; it will depend on the size of - the file being written. To guarantee that changes are picked up in a window, write the file + * File opened for appends via an `append()` operation. This does not change the modification + time of the file until the `close()` call is made on the output stream. + * `FileSystem.setTimes()` can be used to explicitly set the time on a file. + * The rarely used operations: `FileSystem.concat()`, `createSnapshot()`, `createSymlink()` and + `truncate()` all update the modification time. + + This means that when a file is opened, even before data has been written, it may be + seen —and if so, read and added to the list of files to convert to an RDD. + At this point the fact it is empty will be observed, and so the file omitted from the RDD. + However, it will already have been considered to have been seen, *and any updates to the + file within the same window will be ignored*. That means that if a file is created and + then written to within the same streaming window, its contents may actually be missed. + To guarantee that changes are picked up in a window, write the file to a directory which is not being monitored, then, after the file is closed, `rename()` it into the destination directory. @@ -680,18 +687,23 @@ methods for creating DStreams from files as input sources. Object stores have a different set of limitations. - + Wildcard directory enumeration may be slow. - + The slow-down from having many files to scan for changes is very significant. - + Renaming directories and files can be very slow. + + Wildcard directory enumeration can very slow, especially if there are many directories + or files to scan. + + A file's creation time is also its modification time. + + The `FileSystem.setTimes()` command to set file timestamps will be ignored. + + `FileSystem.rename(file)` of a single file will update the modification time —but the time to rename is + `O(length(file))`. + + `FileSystem.rename(directory)` is not atomic, and slower the more data there is to rename. + It may update the modification times of renamed files. + Object creation directly though a PUT operation is atomic, irrespective of the programming language or library used to upload the data. - + The `FileSystem.setTimes()` command to set file timestamps will be ignored. + Writing a file to an object store using Hadoop's APIs is an atomic operation; the object is only created via a PUT operation in the final `OutputStream.close()` call. For this reason, applications using an object store as the direct destination of data can use PUT operations to directly publish data for a DStream to pick up —even though this - is not the mechanism to use when writing to HDFS or other filesystem. + is not the mechanism to use when writing to HDFS or other filesystem. If `rename()` is used, + to place the completed files into a monitored directory, expect the operation to be slow. For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. From 9df7ff40215a95fcdca92d4ee19f38196aa7ee8c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 10 Oct 2016 16:12:49 +0100 Subject: [PATCH 13/18] [SPARK-17159] rebase to master; verify new test still works; review & tighten documentation --- docs/streaming-programming-guide.md | 68 ++++++++++--------- .../spark/streaming/InputStreamsSuite.scala | 12 ++-- 2 files changed, 43 insertions(+), 37 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index d9ff1b9cea74..dddcdaae03bc 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -631,7 +631,8 @@ methods for creating DStreams from files as input sources. Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. - + A simple directory can be supplied, such as `hdfs://namenode:8040/logs/`. + ++ The files must have the same data format. + + A simple directory can be monitored, such as `hdfs://namenode:8040/logs/`. All files directly such a path will be processed as they are discovered. + A POSIX glob pattern can be supplied, such as `hdfs://namenode:8040/logs/2016-??-31`. @@ -646,25 +647,31 @@ methods for creating DStreams from files as input sources. into the data directory from another part of the same filesystem. * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016-*`, renaming an entire directory to match the path will add the directory to the list of - monitored directories. Unless the modification time of the directory's files - are within that of the current window, they will not be recognized as new files. + monitored directories. Only the files in the directory whose modification time is + within the current window will be included in the stream. + Once processed, changes to a file within the current window will not cause the file to be reread. - That is: Updates are ignored. + That is: *updates are ignored*. + The more files under a directory/wildcard pattern, the longer it will take to scan for changes —even if no files have actually changed. + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed. + + For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. + + Python API `fileStream` is not available in the Python API, only `textFileStream` is available. + + Special points for HDFS - - For performance and scalability, the HDFS filesystem does not update the modification - time while it is being written to. Specifically - + + The HDFS filesystem does not update the modification time while it is being written to. + Specifically + + `FileSystem.create()` creation: a zero-byte file is listed; creation and modification time is set to the current time as seen on the NameNode. * Writes to a file via the output stream returned in the `create()` call: the modification time *does not change*. - * When `OutputStream.close()` is called, remaining data is written, the file closed and + * When `OutputStream.close()` is called, all remaining data is written, the file closed and the NameNode updated with the final size of the file. The modification time is set to the time the file was closed. * File opened for appends via an `append()` operation. This does not change the modification @@ -673,26 +680,26 @@ methods for creating DStreams from files as input sources. * The rarely used operations: `FileSystem.concat()`, `createSnapshot()`, `createSymlink()` and `truncate()` all update the modification time. - This means that when a file is opened, even before data has been written, it may be - seen —and if so, read and added to the list of files to convert to an RDD. - At this point the fact it is empty will be observed, and so the file omitted from the RDD. - However, it will already have been considered to have been seen, *and any updates to the - file within the same window will be ignored*. That means that if a file is created and - then written to within the same streaming window, its contents may actually be missed. + Together, this means that when a file is opened, even before data has been completely written, + it may be included in the DStream -after which updates to the file within the same window + will be ignored. That is: changes may be missed, and data omitted from the stream. To guarantee that changes are picked up in a window, write the file - to a directory which is not being monitored, then, after the file is closed, `rename()` it into - the destination directory. - - Object stores - + to an unmonitored directory, then immediately after the output stream is closed, + rename it into the destination directory. + Provided the renamed file appears in the scanned destination directory during the window + of its creation, the new data will be picked up. + Object stores have a different set of limitations. + Wildcard directory enumeration can very slow, especially if there are many directories or files to scan. - + A file's creation time is also its modification time. + * The file only becomes visible at the end of the write operation; this also defines. + the creation time of the file. + + A file's modification time is always the same as its creation time. + The `FileSystem.setTimes()` command to set file timestamps will be ignored. - + `FileSystem.rename(file)` of a single file will update the modification time —but the time to rename is - `O(length(file))`. + + `FileSystem.rename(file)` of a single file will update the modification time. + The time to rename a file is generally `O(length(file))`: the bigger the file, the longer + it takes. + `FileSystem.rename(directory)` is not atomic, and slower the more data there is to rename. It may update the modification times of renamed files. + Object creation directly though a PUT operation is atomic, irrespective of @@ -700,15 +707,12 @@ methods for creating DStreams from files as input sources. + Writing a file to an object store using Hadoop's APIs is an atomic operation; the object is only created via a PUT operation in the final `OutputStream.close()` call. - For this reason, applications using an object store as the direct destination of data - can use PUT operations to directly publish data for a DStream to pick up —even though this - is not the mechanism to use when writing to HDFS or other filesystem. If `rename()` is used, - to place the completed files into a monitored directory, expect the operation to be slow. - - For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. - - Python API `fileStream` is not available in the Python API, only `textFileStream` is available. - + Applications using an object store as the direct destination of data + should use PUT operations to directly publish data for a DStream to pick up —even though this + is not the mechanism to use when writing to HDFS or other filesystem. When using the Hadoop + FileSystem API in Spark that means: write the data directory directly to the target directory, + knowing that it is the final `close()` call will make the file visible and set its creation & + modified times. - **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 25a6a07d3a88..ae6216126c81 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -222,12 +222,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // not enough to trigger a batch clock.advance(batchDuration.milliseconds / 2) - def createFileAndAdvenceTime(data: Int, dir: File): Unit = { + def createFileAndAdvanceTime(data: Int, dir: File): Unit = { val file = new File(testSubDir1, data.toString) Files.write(data + "\n", file, StandardCharsets.UTF_8) assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) - logInfo("Created file " + file) + logInfo(s"Created file $file") // Advance the clock after creating the file to avoid a race when // setting its modification time clock.advance(batchDuration.milliseconds) @@ -237,18 +237,20 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Over time, create files in the temp directory 1 val input1 = Seq(1, 2, 3, 4, 5) - input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1)) + input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1)) // Over time, create files in the temp directory 1 val input2 = Seq(6, 7, 8, 9, 10) - input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2)) + input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2)) // Verify that all the files have been read val expectedOutput = (input1 ++ input2).map(_.toString).toSet assert(outputQueue.asScala.flatten.toSet === expectedOutput) } } finally { - if (testDir != null) Utils.deleteRecursively(testDir) + if (testDir != null) { + Utils.deleteRecursively(testDir) + } } } From ac47d4274966048c0c5d26f241c1988a9c23d9e8 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 3 Jan 2017 14:41:33 +0000 Subject: [PATCH 14/18] [SPARK-17159] review/tighten docs, move HDFS timestamp details out --- docs/streaming-programming-guide.md | 198 ++++++++++++++-------------- 1 file changed, 98 insertions(+), 100 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index dddcdaae03bc..e5105f264382 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -615,108 +615,106 @@ which creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources. -- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as: - -
-
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) -
-
- streamingContext.fileStream(dataDirectory); -
-
- streamingContext.textFileStream(dataDirectory) -
-
- - Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. - - ++ The files must have the same data format. - + A simple directory can be monitored, such as `hdfs://namenode:8040/logs/`. - All files directly such a path will be processed as they are discovered. - + A POSIX glob pattern can be supplied, such as - `hdfs://namenode:8040/logs/2016-??-31`. - Here, the DStream will consist of all files directly under those directories - matching the regular expression. - That is: it is a pattern of directories, not of files in directories. - + All files must be in the same data format. - * A file is considered part of a time period based on its modification time - —not its creation time. - + Files must be created in/moved under the `dataDirectory` directory/directories by - an atomic operation. In HDFS and similar filesystems, this can be done *renaming* them - into the data directory from another part of the same filesystem. - * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016-*`, - renaming an entire directory to match the path will add the directory to the list of - monitored directories. Only the files in the directory whose modification time is - within the current window will be included in the stream. - + Once processed, changes to a file within the current window will not cause the file to be reread. - That is: *updates are ignored*. - + The more files under a directory/wildcard pattern, the longer it will take to - scan for changes —even if no files have actually changed. - + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have the file picked - up in a later window, even if its contents have not changed. - - - For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. - - Python API `fileStream` is not available in the Python API, only `textFileStream` is available. - - - Special points for HDFS - - The HDFS filesystem does not update the modification time while it is being written to. - Specifically - - + `FileSystem.create()` creation: a zero-byte file is listed; creation and modification time is - set to the current time as seen on the NameNode. - * Writes to a file via the output stream returned in the `create()` call: the modification - time *does not change*. - * When `OutputStream.close()` is called, all remaining data is written, the file closed and - the NameNode updated with the final size of the file. The modification time is set to - the time the file was closed. - * File opened for appends via an `append()` operation. This does not change the modification - time of the file until the `close()` call is made on the output stream. - * `FileSystem.setTimes()` can be used to explicitly set the time on a file. - * The rarely used operations: `FileSystem.concat()`, `createSnapshot()`, `createSymlink()` and - `truncate()` all update the modification time. - - Together, this means that when a file is opened, even before data has been completely written, - it may be included in the DStream -after which updates to the file within the same window - will be ignored. That is: changes may be missed, and data omitted from the stream. - To guarantee that changes are picked up in a window, write the file - to an unmonitored directory, then immediately after the output stream is closed, - rename it into the destination directory. - Provided the renamed file appears in the scanned destination directory during the window - of its creation, the new data will be picked up. - - Object stores have a different set of limitations. - - + Wildcard directory enumeration can very slow, especially if there are many directories - or files to scan. - * The file only becomes visible at the end of the write operation; this also defines. - the creation time of the file. - + A file's modification time is always the same as its creation time. - + The `FileSystem.setTimes()` command to set file timestamps will be ignored. - + `FileSystem.rename(file)` of a single file will update the modification time. - The time to rename a file is generally `O(length(file))`: the bigger the file, the longer - it takes. - + `FileSystem.rename(directory)` is not atomic, and slower the more data there is to rename. - It may update the modification times of renamed files. - + Object creation directly though a PUT operation is atomic, irrespective of - the programming language or library used to upload the data. - + Writing a file to an object store using Hadoop's APIs is an atomic operation; - the object is only created via a PUT operation in the final `OutputStream.close()` call. - - Applications using an object store as the direct destination of data - should use PUT operations to directly publish data for a DStream to pick up —even though this - is not the mechanism to use when writing to HDFS or other filesystem. When using the Hadoop - FileSystem API in Spark that means: write the data directory directly to the target directory, - knowing that it is the final `close()` call will make the file visible and set its creation & - modified times. -- **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver +#### File Streams +{:.no_toc} + +For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as +via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`. + +File streams do not require running a receiver, hence does not require allocating cores. + +For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`. + +
+
+ +{% highlight scala %} +streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) +{% endhighlight %} +For text files + +{% highlight scala %} +streamingContext.textFileStream(dataDirectory) +{% endhighlight %} +
+ +
+{% highlight java %} +streamingContext.fileStream(dataDirectory); +{% endhighlight %} +For text files + +{% highlight java %} +streamingContext.textFileStream(dataDirectory); +{% endhighlight %} +
+ +
+`fileStream` is not available in the Python API; only `textFileStream` is available. +{% highlight python %} +streamingContext.textFileStream(dataDirectory) +{% endhighlight %} +
+ +
+ +##### How Directories are Monitored +{:.no_toc} + +Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. + + * A simple directory can be monitored, such as `hdfs://namenode:8040/logs/`. + All files directly under such a path will be processed as they are discovered. + + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as + `hdfs://namenode:8040/logs/2017/*`. + Here, the DStream will consist of all files in the directories + matching the pattern. + That is: it is a pattern of directories, not of files in directories. + + All files must be in the same data format. + * A file is considered part of a time period based on its modification time, + not its creation time. + + Once processed, changes to a file within the current window will not cause the file to be reread. + That is: *updates are ignored*. + + The more files under a directory, the longer it will take to + scan for changes —even if no files have been modified. + * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016-*`, + renaming an entire directory to match the path will add the directory to the list of + monitored directories. Only the files in the directory whose modification time is + within the current window will be included in the stream. + + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have the file picked + up in a later window, even if its contents have not changed. + + +##### Streaming to FileSystems vs Object stores +{:.no_toc} + +"Full" Filesystems such as HDFS tend to set the modification time on their files as soon +as the output stream is created. +When a file is opened, even before data has been completely written, +it may be included in the `DStream` —after which updates to the file within the same window +will be ignored. That is: changes may be missed, and data omitted from the stream. + +To guarantee that changes are picked up in a window, write the file +to an unmonitored directory, then, immediately after the output stream is closed, +rename it into the destination directory. +Provided the renamed file appears in the scanned destination directory during the window +of its creation, the new data will be picked up. + +In contrast, Object Stores have very slow rename operations (the data is usually copied). +and `FileSystem.setTimes()` is usually a no-op. +However as objects are not visible until the writing operation has completed, +applications may write directly to the monitored directory. + +#### Streams based on Custom Receivers +{:.no_toc} + +DStreams can be created with data streams received through custom receivers. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. -- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. +#### Queue of RDDs as a Stream +{:.no_toc} + +For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. For more details on streams from sockets and files, see the API documentations of the relevant functions in [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for From f38a9851911605c64fe2ad779617bb932cab507b Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 23 Jan 2017 17:54:54 +0000 Subject: [PATCH 15/18] [SPARK-17159] ; address comments, move to withTempDir for tests with a temp dur. Docs now refer reader to the Hadoop FS spec for any details about what object stores do --- docs/streaming-programming-guide.md | 12 ++++--- .../spark/streaming/InputStreamsSuite.scala | 31 +++++-------------- .../spark/streaming/TestSuiteBase.scala | 14 ++++++++- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index e5105f264382..1a5baa04940b 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -691,7 +691,7 @@ Spark Streaming will monitor the directory `dataDirectory` and process any files "Full" Filesystems such as HDFS tend to set the modification time on their files as soon as the output stream is created. When a file is opened, even before data has been completely written, -it may be included in the `DStream` —after which updates to the file within the same window +it may be included in the `DStream` - after which updates to the file within the same window will be ignored. That is: changes may be missed, and data omitted from the stream. To guarantee that changes are picked up in a window, write the file @@ -700,10 +700,12 @@ rename it into the destination directory. Provided the renamed file appears in the scanned destination directory during the window of its creation, the new data will be picked up. -In contrast, Object Stores have very slow rename operations (the data is usually copied). -and `FileSystem.setTimes()` is usually a no-op. -However as objects are not visible until the writing operation has completed, -applications may write directly to the monitored directory. +In contrast, Object Stores have very slow rename operations (the data is usually copied), +and the renamed object may have the time of the copy operation as its modification time. +Careful testing is needed against the target object store to verify that the timestamp behavior +of the store is consistent with that expected by Spark Streaming. For more details on this topic, +consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html). + #### Streams based on Custom Receivers {:.no_toc} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index ae6216126c81..ab01e765608c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -131,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("binary records stream") { - var testDir: File = null - try { + withTempDir { testDir => val batchDuration = Seconds(2) - testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, StandardCharsets.UTF_8) @@ -177,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(obtainedOutput(i) === input.map(b => (b + i).toByte)) } } - } finally { - if (testDir != null) Utils.deleteRecursively(testDir) } } @@ -191,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } test("file input stream - wildcard") { - var testDir: File = null - try { + withTempDir { testDir => val batchDuration = Seconds(2) - testDir = Utils.createTempDir() val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1") val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2") @@ -247,10 +241,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val expectedOutput = (input1 ++ input2).map(_.toString).toSet assert(outputQueue.asScala.flatten.toSet === expectedOutput) } - } finally { - if (testDir != null) { - Utils.deleteRecursively(testDir) - } } } @@ -260,15 +250,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { * Uses the Hadoop APIs to verify consistent behavior with the operations used internally. */ test("renamed directories are scanned") { - val testDir = Utils.createTempDir() - try { + withTempDir { testDir => val batchDuration = Seconds(2) val durationMs = batchDuration.milliseconds val testPath = new Path(testDir.toURI) val streamDir = new Path(testPath, "streaming") val streamGlobPath = new Path(streamDir, "sub*") - val generatedDir = new Path(testPath, "generated"); - val generatedSubDir = new Path(generatedDir, "subdir"); + val generatedDir = new Path(testPath, "generated") + val generatedSubDir = new Path(generatedDir, "subdir") val renamedSubDir = new Path(streamDir, "subdir") withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => @@ -288,7 +277,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val existingFile = new Path(generatedSubDir, "existing"); + val existingFile = new Path(generatedSubDir, "existing") write(existingFile, "existing\n") val status = fs.getFileStatus(existingFile) clock.setTime(status.getModificationTime + durationMs) @@ -326,8 +315,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // the window assert(Set("renamed") === outputQueue.asScala.flatten.toSet) } - } finally { - Utils.deleteRecursively(testDir) } } @@ -496,10 +483,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } def testFileStream(newFilesOnly: Boolean) { - var testDir: File = null - try { + withTempDir { testDir => val batchDuration = Seconds(2) - testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, StandardCharsets.UTF_8) @@ -546,8 +531,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } assert(outputQueue.asScala.flatten.toSet === expectedOutput) } - } finally { - if (testDir != null) Utils.deleteRecursively(testDir) } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index dbab70886102..ada494eb897f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import java.io.{IOException, ObjectInputStream} +import java.io.{File, IOException, ObjectInputStream} import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ @@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { verifyOutput[W](output.toSeq, expectedOutput, useSet) } } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * (originally from `SqlTestUtils`.) + * @todo Probably this method should be moved to a more general place + */ + protected def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + } From 49519cca9568442c2fa14fd7fc70cb2fcfce0754 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 24 Feb 2017 16:12:31 +0000 Subject: [PATCH 16/18] SPARK-17159 tail end of the rebase --- .../main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 7f66711e8318..f475ce87540a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy -import java.io.{FileNotFoundException, IOException} -import java.lang.reflect.Method +import java.io.IOException import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} From e03d189bedaa95f6be06c6919db1029df7545472 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 24 Feb 2017 16:20:42 +0000 Subject: [PATCH 17/18] SPARK-17159 review of docs: quote paths to clearly show what is code and what is just punctuation --- docs/streaming-programming-guide.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1a5baa04940b..74a8a108ddaa 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -663,10 +663,10 @@ streamingContext.textFileStream(dataDirectory) Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. - * A simple directory can be monitored, such as `hdfs://namenode:8040/logs/`. + * A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`. All files directly under such a path will be processed as they are discovered. + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as - `hdfs://namenode:8040/logs/2017/*`. + `"hdfs://namenode:8040/logs/2017/*"`. Here, the DStream will consist of all files in the directories matching the pattern. That is: it is a pattern of directories, not of files in directories. @@ -677,7 +677,7 @@ Spark Streaming will monitor the directory `dataDirectory` and process any files That is: *updates are ignored*. + The more files under a directory, the longer it will take to scan for changes —even if no files have been modified. - * If a wildcard is used to identify directories, such as `hdfs://namenode:8040/logs/2016-*`, + * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`, renaming an entire directory to match the path will add the directory to the list of monitored directories. Only the files in the directory whose modification time is within the current window will be included in the stream. From a3aaf267d2ac30c012b4a71b7a80e28a49ff10be Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 24 Feb 2017 16:43:50 +0000 Subject: [PATCH 18/18] SPARK-17159 address sean's review comments, and read over the object store text and update slightly to make things a bit clearer. The more I learn about object stores, the less they resemble file systems. --- docs/streaming-programming-guide.md | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 74a8a108ddaa..ce294b62b511 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -676,13 +676,13 @@ Spark Streaming will monitor the directory `dataDirectory` and process any files + Once processed, changes to a file within the current window will not cause the file to be reread. That is: *updates are ignored*. + The more files under a directory, the longer it will take to - scan for changes —even if no files have been modified. + scan for changes — even if no files have been modified. * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`, renaming an entire directory to match the path will add the directory to the list of monitored directories. Only the files in the directory whose modification time is within the current window will be included in the stream. - + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have the file picked - up in a later window, even if its contents have not changed. + + Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-) + to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed. ##### Streaming to FileSystems vs Object stores @@ -700,11 +700,17 @@ rename it into the destination directory. Provided the renamed file appears in the scanned destination directory during the window of its creation, the new data will be picked up. -In contrast, Object Stores have very slow rename operations (the data is usually copied), -and the renamed object may have the time of the copy operation as its modification time. +In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the +data is actually copied. +Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so +may not be considered part of the window which the original create time implied they were. + Careful testing is needed against the target object store to verify that the timestamp behavior -of the store is consistent with that expected by Spark Streaming. For more details on this topic, -consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html). +of the store is consistent with that expected by Spark Streaming. It may be +that writing directly into a destination directory is the appropriate strategy for +streaming data via the chosen object store. + +For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html). #### Streams based on Custom Receivers