Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,7 @@ methods for creating DStreams from files and Akka actors as input sources.
</div>
</div>

Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that

Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). It can also monitor files in subdirectories by setting the optional `depth` parameter to a value greater than 1. Note that
+ The files must have the same data format.
+ The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into
the data directory.
Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ def textFileStream(self, directory):
monitored directory by "moving" them from another location within the same
file system. File names starting with . are ignored.
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
return textFileStream(self, directory, 1)

def textFileStream(self, directory, depth):
"""
Create an input stream that monitor files in subdirectories.
"""
return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer())

def binaryRecordsStream(self, directory, recordLength):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,27 @@ class StreamingContext private[streaming] (
new FileInputDStream[K, V, F](this, directory)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, depth: Int): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
Expand All @@ -393,7 +414,33 @@ class StreamingContext private[streaming] (
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
depth: Int): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly)
}

/**
Expand All @@ -417,7 +464,35 @@ class StreamingContext private[streaming] (
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly, Option(conf))
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param conf Hadoop configuration
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String,
filter: Path => Boolean,
newFilesOnly: Boolean,
conf: Configuration,
depth: Int): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly, Option(conf))
}

/**
Expand All @@ -428,10 +503,27 @@ class StreamingContext private[streaming] (
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
*/
def textFileStream(
directory: String,
depth: Int): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString)
}

/**
* :: Experimental ::
*
Expand All @@ -440,21 +532,26 @@ class StreamingContext private[streaming] (
* generating one byte array per record. Files must be written to the monitored directory
* by "moving" them from another location within the same file system. File names
* starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
*
* '''Note:''' We ensure that the byte array for each record in the
* resulting RDDs of the DStream has the provided record length.
*
* @param directory HDFS directory to monitor for new file
* @param recordLength length of each record in bytes
* @param depth Searching depth of HDFS directory
*/
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
recordLength: Int,
depth: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
directory, FileInputDStream.defaultFilter : Path => Boolean,
newFilesOnly=true, conf, depth)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
assert(bytes.length == recordLength, "Byte array does not have correct length")
Expand All @@ -463,6 +560,28 @@ class StreamingContext private[streaming] (
data
}

/**
* :: Experimental ::
*
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as flat binary files, assuming a fixed length per record,
* generating one byte array per record. Files must be written to the monitored directory
* by "moving" them from another location within the same file system. File names
* starting with . are ignored.
*
* '''Note:''' We ensure that the byte array for each record in the
* resulting RDDs of the DStream has the provided record length.
*
* @param directory HDFS directory to monitor for new file
* @param recordLength length of each record in bytes
*/
@Experimental
def binaryRecordsStream(
directory: String,
recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") {
binaryRecordsStream(directory, recordLength, 1)
}

/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,15 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.textFileStream(directory)
}

/**
* Create an input stream that monitor files in subdirectories for new files
* and reads them as text files.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
*/
def textFileStream(directory: String, depth: Int): JavaDStream[String] = {
ssc.textFileStream(directory,depth)
}
/**
* :: Experimental ::
*
Expand Down Expand Up @@ -289,6 +298,34 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param depth Searching depth of HDFS directory
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
depth: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks out of order, but, I question the need for this overload anyway. Maybe @tdas has an opinion. I'd like to keep the scope of this change much more limited to adding a depth param to 1 version of each method (maybe 2 if it really made sense) in each language.

kClass: Class[K],
vClass: Class[V],
fClass: Class[F]): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
ssc.fileStream[K, V, F](directory, depth)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
Expand Down Expand Up @@ -318,6 +355,39 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory, fn, newFilesOnly)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F],
filter: JFunction[Path, JBoolean],
newFilesOnly: Boolean,
depth: Int): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, depth)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
Expand Down Expand Up @@ -349,6 +419,41 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
}

/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* It can also monitor files in subdirectories by setting the optional `depth`
* parameter to a value greater than 1.
* @param directory HDFS directory to monitor for new file
* @param kClass class of key for reading HDFS file
* @param vClass class of value for reading HDFS file
* @param fClass class of input format for reading HDFS file
* @param filter Function to filter paths to process
* @param newFilesOnly Should process only new files and ignore existing files in the directory
* @param conf Hadoop configuration
* @param depth Searching depth of HDFS directory
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String,
kClass: Class[K],
vClass: Class[V],
fClass: Class[F],
filter: JFunction[Path, JBoolean],
newFilesOnly: Boolean,
conf: Configuration,
depth: Int): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] = ClassTag(kClass)
implicit val cmv: ClassTag[V] = ClassTag(vClass)
implicit val cmf: ClassTag[F] = ClassTag(fClass)
def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue()
ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf, depth)
}

/**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
Expand Down
Loading