Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
19d47cd
SPARK-17159: move filtering of directories and files out of glob/list…
steveloughran Aug 20, 2016
6f1ea36
[SPARK-17159] Remove the fileModTime cache. Now that the modification…
steveloughran Aug 20, 2016
c2a4382
[SPARK-17159] inline FileStatus.getModificationTime; address style is…
steveloughran Aug 20, 2016
0c88093
[SPARK-17159] updates as discussed on PR: skip wildcards for non wild…
steveloughran Aug 23, 2016
36c5881
[SPARK-17159] move glob operation into SparkHadoopUtils, alongside an…
steveloughran Aug 24, 2016
a69d1b6
[SPARK-17159] use a simpler pattern in the example docs
steveloughran Aug 24, 2016
f8c9521
[SPARK-17159] add directory rename test (taken from SPARK-7481 exampl…
steveloughran Aug 26, 2016
4f01721
[SPARK-17159] method nested inside a sparktest test closure being mis…
steveloughran Aug 26, 2016
1b2027c
[SPARK-17159] method -> globToFileStatusIfNecessary
steveloughran Aug 27, 2016
921c5c2
[SPARK-17159] File input dstream: revert to directory list operation …
steveloughran Aug 30, 2016
bff0d13
[SPARK-17159] round out the file streaming text with the dirty detail…
steveloughran Sep 16, 2016
a67902b
SPARK-17159 Chris Nauroth of HDFS team clarified which operations upd…
steveloughran Sep 19, 2016
9df7ff4
[SPARK-17159] rebase to master; verify new test still works; review &…
steveloughran Oct 10, 2016
ac47d42
[SPARK-17159] review/tighten docs, move HDFS timestamp details out
steveloughran Jan 3, 2017
f38a985
[SPARK-17159] ; address comments, move to withTempDir for tests with …
steveloughran Jan 23, 2017
49519cc
SPARK-17159 tail end of the rebase
steveloughran Feb 24, 2017
e03d189
SPARK-17159 review of docs: quote paths to clearly show what is code …
steveloughran Feb 24, 2017
a3aaf26
SPARK-17159 address sean's review comments, and read over the object …
steveloughran Feb 24, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 100 additions & 21 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -615,35 +615,114 @@ which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the StreamingContext API provides
methods for creating DStreams from files as input sources.

- **File Streams:** For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
#### File Streams
{:.no_toc}

For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.

File streams do not require running a receiver, hence does not require allocating cores.

For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
{% endhighlight %}
For text files

{% highlight scala %}
streamingContext.textFileStream(dataDirectory)
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
{% endhighlight %}
For text files

{% highlight java %}
streamingContext.textFileStream(dataDirectory);
{% endhighlight %}
</div>

<div class="codetabs">
<div data-lang="scala" markdown="1">
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
</div>
<div data-lang="java" markdown="1">
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
</div>
<div data-lang="python" markdown="1">
streamingContext.textFileStream(dataDirectory)
</div>
</div>
<div data-lang="python" markdown="1">
`fileStream` is not available in the Python API; only `textFileStream` is available.
{% highlight python %}
streamingContext.textFileStream(dataDirectory)
{% endhighlight %}
</div>

Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that
</div>

+ The files must have the same data format.
+ The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
the data directory.
+ Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
##### How Directories are Monitored
{:.no_toc}

For simple text files, there is an easier method `streamingContext.textFileStream(dataDirectory)`. And file streams do not require running a receiver, hence does not require allocating cores.
Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory.

* A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`.
All files directly under such a path will be processed as they are discovered.
+ A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as
`"hdfs://namenode:8040/logs/2017/*"`.
Here, the DStream will consist of all files in the directories
matching the pattern.
That is: it is a pattern of directories, not of files in directories.
+ All files must be in the same data format.
* A file is considered part of a time period based on its modification time,
not its creation time.
+ Once processed, changes to a file within the current window will not cause the file to be reread.
That is: *updates are ignored*.
+ The more files under a directory, the longer it will take to
scan for changes — even if no files have been modified.
* If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`,
renaming an entire directory to match the path will add the directory to the list of
monitored directories. Only the files in the directory whose modification time is
within the current window will be included in the stream.
+ Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-)
to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed.


Copy link
Contributor

@uncleGen uncleGen Feb 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember there is case users may be confused with: if old files are copied moved from other directories, these files maybe can not be found (as mod_time is not changed). But users are expect to find them. Maybe, we need to doc here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for "real" filesystems, rename doesn't change modtime, and files become visible in create(), so if you do a create() in the dest dir the file may be found and scanned before the data is fully written. Hence best practise: write elsewhere and rename in. Which by its very nature is the wrong thing to do for object stores: not only is there the perf hit of the copy, the timestamp changes. -and you can't set the times; if you could, we'd probably do it in the filesystem code. PUT-in-place eliminates the copy, and as the file's aren't visible until close(), no risk of early scan

Of course, real (tm) filesystems do let you update the modtime, you can update them after the rename. Even better: do it before so there is no time period when they are in the dest folder with a different timestamp

If you want to make sure this section is clear about what the best practises and surprises are for both FS and blobstore submission, suggest any changes

##### Streaming to FileSystems vs Object stores
{:.no_toc}

<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.
"Full" Filesystems such as HDFS tend to set the modification time on their files as soon
as the output stream is created.
When a file is opened, even before data has been completely written,
it may be included in the `DStream` - after which updates to the file within the same window
will be ignored. That is: changes may be missed, and data omitted from the stream.

- **Streams based on Custom Receivers:** DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
To guarantee that changes are picked up in a window, write the file
to an unmonitored directory, then, immediately after the output stream is closed,
rename it into the destination directory.
Provided the renamed file appears in the scanned destination directory during the window
of its creation, the new data will be picked up.

In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the
data is actually copied.
Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so
may not be considered part of the window which the original create time implied they were.

Careful testing is needed against the target object store to verify that the timestamp behavior
of the store is consistent with that expected by Spark Streaming. It may be
that writing directly into a destination directory is the appropriate strategy for
streaming data via the chosen object store.

For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html).


#### Streams based on Custom Receivers
{:.no_toc}

DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
Guide](streaming-custom-receivers.html) for more details.

- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
#### Queue of RDDs as a Stream
{:.no_toc}

For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.

For more details on streams from sockets and files, see the API documentations of the relevant functions in
[StreamingContext](api/scala/index.html#org.apache.spark.streaming.StreamingContext) for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.io.{FileNotFoundException, IOException, ObjectInputStream}

import scala.collection.mutable
import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}

import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
Expand Down Expand Up @@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()

// Read-through cache of file mod times, used to speed up mod time lookups
@transient private var fileToModTime = new TimeStampedHashMap[String, Long](true)

// Timestamp of the last round of finding files
@transient private var lastNewFileFindingTime = 0L

Expand All @@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files that were processed
* in the latest modification time in the previous call to this method. This is because the
* modification time returned by the FileStatus API seems to return times only at the
* granularity of seconds. And new files may have the same modification time as the
* granularity of seconds in HDFS. And new files may have the same modification time as the
* latest modification time in the previous call to this method yet was not reported in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"HDFS" -> "HDFS compatible file system" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it really is HDFS alone. We have no idea whatsoever about the granuarity of other filesystems. Could be 2 seconds (is FAT 32 supported? Hope not) NTFS is in nanoseconds apparently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

* the previous call.
*/
Expand Down Expand Up @@ -173,8 +170,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
}
// Delete file mod times that weren't accessed in the last round of getting new files
fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
}

/**
Expand All @@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")

val newFileFilter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directoryFilter = new PathFilter {
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to check for directories.isEmpty after this to catch this error case earlier?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the getOrElse clause should handle the globStatus() return value being null, upconverting it to an empty array. Which is essentially working backwards from this special (awful) case in Glob code

    if ((!sawWildcard) && results.isEmpty() &&
        (flattenedPatterns.size() <= 1)) {
      return null;
    }

With the directories now being an empty array, flatMap() should effectively start with a map of the 0-element array, which is will result in thenewFiles value also being empty. Isn't that right?

.filter(_.isDirectory)
.map(_.getPath)
val newFiles = directories.flatMap(dir =>
fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
fs.listStatus(dir)
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
.map(_.getPath.toString))
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
logInfo(s"Finding new files took $timeTaken ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
s"Time taken to find new files $timeTaken exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
"files in the monitored directories."
)
}
newFiles
} catch {
case e: FileNotFoundException =>
logWarning(s"No directory to scan: $directoryPath: $e")
Array.empty
case e: Exception =>
logWarning("Error finding new files", e)
logWarning(s"Error finding new files under $directoryPath", e)
reset()
Array.empty
}
Expand All @@ -241,16 +236,24 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
* Hence they can get selected as new files again. To prevent this, files whose mod time is more
* than current batch time are not considered.
* @param fileStatus file status
* @param currentTime time of the batch
* @param modTimeIgnoreThreshold the ignore threshold
* @return true if the file has been modified within the batch window
*/
private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = {
private def isNewFile(
fileStatus: FileStatus,
currentTime: Long,
modTimeIgnoreThreshold: Long): Boolean = {
val path = fileStatus.getPath
val pathStr = path.toString
// Reject file if it does not satisfy filter
if (!filter(path)) {
logDebug(s"$pathStr rejected by filter")
return false
}
// Reject file if it was created before the ignore time
val modTime = getFileModTime(path)
val modTime = fileStatus.getModificationTime()
if (modTime <= modTimeIgnoreThreshold) {
// Use <= instead of < to avoid SPARK-4518
logDebug(s"$pathStr ignored as mod time $modTime <= ignore time $modTimeIgnoreThreshold")
Expand Down Expand Up @@ -292,11 +295,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
new UnionRDD(context.sparkContext, fileRDDs)
}

/** Get file mod time from cache or fetch it from the file system */
private def getFileModTime(path: Path) = {
fileToModTime.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}

private def directoryPath: Path = {
if (_path == null) _path = new Path(directory)
_path
Expand All @@ -318,7 +316,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}

/**
Expand Down
Loading