diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index abd4ac965360..ce294b62b511 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -615,35 +615,114 @@ which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the StreamingContext API provides
methods for creating DStreams from files as input sources.
-- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
+#### File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require allocating cores.
+
+For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`.
+
+
+
+
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
+
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
-
-
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
-
-
- streamingContext.fileStream(dataDirectory);
-
-
- streamingContext.textFileStream(dataDirectory)
-
-
+
+`fileStream` is not available in the Python API; only `textFileStream` is available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
- Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
+
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
+##### How Directories are Monitored
+{:.no_toc}
- For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
+Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory.
+
+ * A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`.
+ All files directly under such a path will be processed as they are discovered.
+ + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as
+ `"hdfs://namenode:8040/logs/2017/*"`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its modification time,
+ not its creation time.
+ + Once processed, changes to a file within the current window will not cause the file to be reread.
+ That is: *updates are ignored*.
+ + The more files under a directory, the longer it will take to
+ scan for changes — even if no files have been modified.
+ * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`,
+ renaming an entire directory to match the path will add the directory to the list of
+ monitored directories. Only the files in the directory whose modification time is
+ within the current window will be included in the stream.
+ + Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-)
+ to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed.
+
+
+##### Streaming to FileSystems vs Object stores
+{:.no_toc}
- Python API `fileStream` is not available in the Python API, only `textFileStream` is available.
+"Full" Filesystems such as HDFS tend to set the modification time on their files as soon
+as the output stream is created.
+When a file is opened, even before data has been completely written,
+it may be included in the `DStream` - after which updates to the file within the same window
+will be ignored. That is: changes may be missed, and data omitted from the stream.
-- **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
+To guarantee that changes are picked up in a window, write the file
+to an unmonitored directory, then, immediately after the output stream is closed,
+rename it into the destination directory.
+Provided the renamed file appears in the scanned destination directory during the window
+of its creation, the new data will be picked up.
+
+In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the
+data is actually copied.
+Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so
+may not be considered part of the window which the original create time implied they were.
+
+Careful testing is needed against the target object store to verify that the timestamp behavior
+of the store is consistent with that expected by Spark Streaming. It may be
+that writing directly into a destination directory is the appropriate strategy for
+streaming data via the chosen object store.
+
+For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html).
+
+
+#### Streams based on Custom Receivers
+{:.no_toc}
+
+DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
Guide](streaming-custom-receivers.html) for more details.
-- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
+#### Queue of RDDs as a Stream
+{:.no_toc}
+
+For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
For more details on streams from sockets and files, see the API documentations of the relevant functions in
[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 905b1c52afa6..a5f3d4e23d2b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,19 +17,19 @@
package org.apache.spark.streaming.dstream
-import java.io.{IOException, ObjectInputStream}
+import java.io.{FileNotFoundException, IOException, ObjectInputStream}
import scala.collection.mutable
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
-import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
@@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
- // Read-through cache of file mod times, used to speed up mod time lookups
- @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)
-
// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L
@@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
- * granularity of seconds. And new files may have the same modification time as the
+ * granularity of seconds in HDFS. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
* the previous call.
*/
@@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
- // Delete file mod times that weren't accessed in the last round of getting new files
- fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}
/**
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
- val newFileFilter = new PathFilter {
- def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
- }
- val directoryFilter = new PathFilter {
- override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
- }
- val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
+ val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
+ .filter(_.isDirectory)
+ .map(_.getPath)
val newFiles = directories.flatMap(dir =>
- fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
+ fs.listStatus(dir)
+ .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
+ .map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
- logInfo("Finding new files took " + timeTaken + " ms")
- logDebug("# cached file times = " + fileToModTime.size)
+ logInfo(s"Finding new files took $timeTaken ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
- "Time taken to find new files exceeds the batch size. " +
+ s"Time taken to find new files $timeTaken exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
- "files in the monitored directory."
+ "files in the monitored directories."
)
}
newFiles
} catch {
+ case e: FileNotFoundException =>
+ logWarning(s"No directory to scan: $directoryPath: $e")
+ Array.empty
case e: Exception =>
- logWarning("Error finding new files", e)
+ logWarning(s"Error finding new files under $directoryPath", e)
reset()
Array.empty
}
@@ -241,8 +236,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
+ * @param fileStatus file status
+ * @param currentTime time of the batch
+ * @param modTimeIgnoreThreshold the ignore threshold
+ * @return true if the file has been modified within the batch window
*/
- private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
+ private def isNewFile(
+ fileStatus: FileStatus,
+ currentTime: Long,
+ modTimeIgnoreThreshold: Long): Boolean = {
+ val path = fileStatus.getPath
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
@@ -250,7 +253,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
return false
}
// Reject file if it was created before the ignore time
- val modTime = getFileModTime(path)
+ val modTime = fileStatus.getModificationTime()
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
@@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
new UnionRDD(context.sparkContext, fileRDDs)
}
- /** Get file mod time from cache or fetch it from the file system */
- private def getFileModTime(path: Path) = {
- fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
- }
-
private def directoryPath: Path = {
if (_path == null) _path = new Path(directory)
_path
@@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
- fileToModTime = new TimeStampedHashMap[String, Long](true)
}
/**
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index b5d36a36513a..ab01e765608c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.scalatest.BeforeAndAfter
@@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("binary records stream") {
- var testDir: File = null
- try {
+ withTempDir { testDir =>
val batchDuration = Seconds(2)
- testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
}
}
- } finally {
- if (testDir != null) Utils.deleteRecursively(testDir)
}
}
@@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
test("file input stream - wildcard") {
- var testDir: File = null
- try {
+ withTempDir { testDir =>
val batchDuration = Seconds(2)
- testDir = Utils.createTempDir()
val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
@@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)
- def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
+ def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
val file = new File(testSubDir1, data.toString)
Files.write(data + "\n", file, StandardCharsets.UTF_8)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
- logInfo("Created file " + file)
+ logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
@@ -236,18 +231,90 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Over time, create files in the temp directory 1
val input1 = Seq(1, 2, 3, 4, 5)
- input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+ input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))
// Over time, create files in the temp directory 1
val input2 = Seq(6, 7, 8, 9, 10)
- input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+ input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))
// Verify that all the files have been read
val expectedOutput = (input1 ++ input2).map(_.toString).toSet
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
- } finally {
- if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
+
+ /**
+ * Tests that renamed directories are included in new batches -but that only files created
+ * within the batch window are included.
+ * Uses the Hadoop APIs to verify consistent behavior with the operations used internally.
+ */
+ test("renamed directories are scanned") {
+ withTempDir { testDir =>
+ val batchDuration = Seconds(2)
+ val durationMs = batchDuration.milliseconds
+ val testPath = new Path(testDir.toURI)
+ val streamDir = new Path(testPath, "streaming")
+ val streamGlobPath = new Path(streamDir, "sub*")
+ val generatedDir = new Path(testPath, "generated")
+ val generatedSubDir = new Path(generatedDir, "subdir")
+ val renamedSubDir = new Path(streamDir, "subdir")
+
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val sparkContext = ssc.sparkContext
+ val hc = sparkContext.hadoopConfiguration
+ val fs = FileSystem.get(testPath.toUri, hc)
+
+ fs.delete(testPath, true)
+ fs.mkdirs(testPath)
+ fs.mkdirs(streamDir)
+ fs.mkdirs(generatedSubDir)
+
+ def write(path: Path, text: String): Unit = {
+ val out = fs.create(path, true)
+ IOUtils.write(text, out)
+ out.close()
+ }
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val existingFile = new Path(generatedSubDir, "existing")
+ write(existingFile, "existing\n")
+ val status = fs.getFileStatus(existingFile)
+ clock.setTime(status.getModificationTime + durationMs)
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputQueue)
+ outputStream.register()
+
+ ssc.start()
+ clock.advance(durationMs)
+ eventually(eventuallyTimeout) {
+ assert(1 === batchCounter.getNumCompletedBatches)
+ }
+ // create and rename the file
+ // put a file into the generated directory
+ val textPath = new Path(generatedSubDir, "renamed.txt")
+ write(textPath, "renamed\n")
+ val now = clock.getTimeMillis()
+ val modTime = now + durationMs / 2
+ fs.setTimes(textPath, modTime, modTime)
+ val textFilestatus = fs.getFileStatus(existingFile)
+ assert(textFilestatus.getModificationTime < now + durationMs)
+
+ // rename the directory under the path being scanned
+ fs.rename(generatedSubDir, renamedSubDir)
+
+ // move forward one window
+ clock.advance(durationMs)
+ // await the next scan completing
+ eventually(eventuallyTimeout) {
+ assert(2 === batchCounter.getNumCompletedBatches)
+ }
+ // verify that the "renamed" file is found, but not the "existing" one which is out of
+ // the window
+ assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
+ }
}
}
@@ -416,10 +483,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
def testFileStream(newFilesOnly: Boolean) {
- var testDir: File = null
- try {
+ withTempDir { testDir =>
val batchDuration = Seconds(2)
- testDir = Utils.createTempDir()
// Create a file that exists before the StreamingContext is created:
val existingFile = new File(testDir, "0")
Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@@ -466,8 +531,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
- } finally {
- if (testDir != null) Utils.deleteRecursively(testDir)
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index dbab70886102..ada494eb897f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import java.io.{IOException, ObjectInputStream}
+import java.io.{File, IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
@@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}
+
+ /**
+ * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
+ * returns.
+ * (originally from `SqlTestUtils`.)
+ * @todo Probably this method should be moved to a more general place
+ */
+ protected def withTempDir(f: File => Unit): Unit = {
+ val dir = Utils.createTempDir().getCanonicalFile
+ try f(dir) finally Utils.deleteRecursively(dir)
+ }
+
}