diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index abd4ac965360..f820f9cc76b5 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -615,35 +615,113 @@ which creates a DStream from text data received over a TCP socket connection. Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources. -- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as: +#### File Streams +{:.no_toc} -
-
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) -
-
- streamingContext.fileStream(dataDirectory); -
-
- streamingContext.textFileStream(dataDirectory) -
-
+For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as +via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`. - Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that +File streams do not require running a receiver so there is no need to allocate any cores for receiving file data. - + The files must have the same data format. - + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into - the data directory. - + Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read. +For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`. - For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores. +
+
+ +{% highlight scala %} +streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory) +{% endhighlight %} +For text files - Python API `fileStream` is not available in the Python API, only `textFileStream` is available. +{% highlight scala %} +streamingContext.textFileStream(dataDirectory) +{% endhighlight %} +
-- **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver +
+{% highlight java %} +streamingContext.fileStream(dataDirectory); +{% endhighlight %} +For text files + +{% highlight java %} +streamingContext.textFileStream(dataDirectory); +{% endhighlight %} +
+ +
+`fileStream` is not available in the Python API; only `textFileStream` is available. +{% highlight python %} +streamingContext.textFileStream(dataDirectory) +{% endhighlight %} +
+ +
+ +##### How Directories are Monitored +{:.no_toc} + +Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. + + * A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`. + All files directly under such a path will be processed as they are discovered. + + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as + `"hdfs://namenode:8040/logs/2017/*"`. + Here, the DStream will consist of all files in the directories + matching the pattern. + That is: it is a pattern of directories, not of files in directories. + + All files must be in the same data format. + * A file is considered part of a time period based on its modification time, + not its creation time. + + Once processed, changes to a file within the current window will not cause the file to be reread. + That is: *updates are ignored*. + + The more files under a directory, the longer it will take to + scan for changes — even if no files have been modified. + * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`, + renaming an entire directory to match the path will add the directory to the list of + monitored directories. Only the files in the directory whose modification time is + within the current window will be included in the stream. + + Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-) + to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed. + + +##### Using Object Stores as a source of data +{:.no_toc} + +"Full" Filesystems such as HDFS tend to set the modification time on their files as soon +as the output stream is created. +When a file is opened, even before data has been completely written, +it may be included in the `DStream` - after which updates to the file within the same window +will be ignored. That is: changes may be missed, and data omitted from the stream. + +To guarantee that changes are picked up in a window, write the file +to an unmonitored directory, then, immediately after the output stream is closed, +rename it into the destination directory. +Provided the renamed file appears in the scanned destination directory during the window +of its creation, the new data will be picked up. + +In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the +data is actually copied. +Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so +may not be considered part of the window which the original create time implied they were. + +Careful testing is needed against the target object store to verify that the timestamp behavior +of the store is consistent with that expected by Spark Streaming. It may be +that writing directly into a destination directory is the appropriate strategy for +streaming data via the chosen object store. + +For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html). + +#### Streams based on Custom Receivers +{:.no_toc} + +DStreams can be created with data streams received through custom receivers. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. -- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. +#### Queue of RDDs as a Stream +{:.no_toc} + +For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. For more details on streams from sockets and files, see the API documentations of the relevant functions in [StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for