diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index b784d59666fe..256e4ddd6e43 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -653,8 +653,7 @@ methods for creating DStreams from files and Akka actors as input sources. - Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that - + Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory. It won't search the nested directories by default. You can set the optional `depth` parameter to a value greater than 1 to monitor files in subdirectories. Note that + The files must have the same data format. + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into the data directory. diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index ac5ba69e8dbb..81de2c0b8276 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -253,14 +253,18 @@ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_ return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, UTF8Deserializer()) - def textFileStream(self, directory): + def textFileStream(self, directory, depth=1): """ Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. + + @param directory: The directory to monitor + @param depth: The max depth to search in the directory. The default + value 1 means only searching files in the current directory """ - return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer()) def binaryRecordsStream(self, directory, recordLength): """ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f42d12..17674f2683e5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -386,6 +386,27 @@ class StreamingContext private[streaming] ( new FileInputDStream[K, V, F](this, directory) } + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, depth: Int): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth) + } + /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. @@ -403,7 +424,33 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + depth: Int): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } /** @@ -427,7 +474,35 @@ class StreamingContext private[streaming] ( filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) + new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly, Option(conf)) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + conf: Configuration, + depth: Int): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly, Option(conf)) } /** @@ -442,6 +517,23 @@ class StreamingContext private[streaming] ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + */ + def textFileStream( + directory: String, + depth: Int): DStream[String] = withNamedScope("text file stream") { + fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString) + } + /** * :: Experimental :: * @@ -450,21 +542,25 @@ class StreamingContext private[streaming] ( * generating one byte array per record. Files must be written to the monitored directory * by "moving" them from another location within the same file system. File names * starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. * * '''Note:''' We ensure that the byte array for each record in the * resulting RDDs of the DStream has the provided record length. * * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes + * @param depth Searching depth of HDFS directory */ @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { + recordLength: Int, + depth: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( - directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf) + directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf, depth) val data = br.map { case (k, v) => val bytes = v.getBytes require(bytes.length == recordLength, "Byte array does not have correct length. " + @@ -474,6 +570,28 @@ class StreamingContext private[streaming] ( data } + /** + * :: Experimental :: + * + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files, assuming a fixed length per record, + * generating one byte array per record. Files must be written to the monitored directory + * by "moving" them from another location within the same file system. File names + * starting with . are ignored. + * + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. + * + * @param directory HDFS directory to monitor for new file + * @param recordLength length of each record in bytes + */ + @Experimental + def binaryRecordsStream( + directory: String, + recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { + binaryRecordsStream(directory, recordLength, 1) + } + /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 40deb6d7ea79..23ddf1f210bb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -214,6 +214,15 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.textFileStream(directory) } + /** + * Create an input stream that monitor files in subdirectories for new files + * and reads them as text files. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + */ + def textFileStream(directory: String, depth: Int): JavaDStream[String] = { + ssc.textFileStream(directory, depth) + } /** * :: Experimental :: * @@ -292,6 +301,34 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + depth: Int, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F]): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + ssc.fileStream[K, V, F](directory, depth) + } + /** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. @@ -321,6 +358,39 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + depth: Int): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, depth) + } + /** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. @@ -352,6 +422,41 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + conf: Configuration, + depth: Int): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf, depth) + } + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 86a8e2beff57..8ce20c1c140a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,13 +17,13 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream} +import java.io.{FileNotFoundException, IOException, ObjectInputStream} import scala.collection.mutable import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.{RDD, UnionRDD} @@ -35,8 +35,9 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti * files and creates a stream out of them. The way it works as follows. * * At each batch interval, the file system is queried for files in the given directory and - * detected new files are selected for that batch. In this case "new" means files that - * became visible to readers during that time period. Some extra care is needed to deal + * detected new files are selected for that batch. It can also monitor files in subdirectories by + * setting the optional `depth` parameter to a value greater than 1. In this case "new" means + * files that became visible to readers during that time period. Some extra care is needed to deal * with the fact that files may become visible after they are created. For this purpose, this * class remembers the information about the files selected in past batches for * a certain duration (say, "remember window") as shown in the figure below. @@ -71,6 +72,7 @@ private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( @transient ssc_ : StreamingContext, directory: String, + depth: Int = 1, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, conf: Option[Configuration] = None) @@ -91,6 +93,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock + require(depth >= 1, "nested directories depth must >= 1") // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData @@ -116,6 +119,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() + // Set of directories that were found from the beginning to the present + @transient private var lastFoundDirs = new mutable.HashSet[Path]() + // Read-through cache of file mod times, used to speed up mod time lookups @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) @@ -123,6 +129,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( @transient private var lastNewFileFindingTime = 0L @transient private var path_ : Path = null + @transient private var directoryDepth_ : Int = -1 @transient private var fs_ : FileSystem = null override def start() { } @@ -162,7 +169,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( } /** - * Find new files for the batch of `currentTime`. This is done by first calculating the + * Find new files for the batch of `currentTime` in nested directories. + * This is done by first calculating the * ignore threshold for file mod times, and then getting a list of files filtered based on * the current batch time and the ignore threshold. The ignore threshold is the max of * initial ignore threshold and the trailing end of the remember window (that is, which ever @@ -179,10 +187,55 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( ) logDebug(s"Getting new files for time $currentTime, " + s"ignoring files older than $modTimeIgnoreThreshold") - val filter = new PathFilter { + val newFileFilter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) + val rootDirectoryDepth = directoryDepth + + // Search nested directories to find new files. + def searchFilesRecursively(status: FileStatus, files: mutable.ArrayBuffer[String]): Unit = { + val path = status.getPath + if (status.isDir) { + // Note: A user may set depth = Int.MaxValue to search all nested directories. + if (depth > path.depth() - rootDirectoryDepth) { + if (lastFoundDirs.contains(path)) { + if (status.getModificationTime > modTimeIgnoreThreshold) { + fs.listStatus(path).foreach(searchFilesRecursively(_, files)) + } + } else { + lastFoundDirs += path + fs.listStatus(path).foreach(searchFilesRecursively(_, files)) + } + } + } else { + if (newFileFilter.accept(path)) { + files += path.toString + } + } + } + + val validDirs: Iterable[Path] = + if (lastFoundDirs.isEmpty) { + Seq(directoryPath) + } + else { + lastFoundDirs.filter { path => + // If the mod time of directory is more than ignore time, no new files in this directory + try { + val status = fs.getFileStatus(path) + status != null && status.getModificationTime > modTimeIgnoreThreshold + } catch { + // If the directory don't find, remove the directory from `lastFoundDirs` + case e: FileNotFoundException => + lastFoundDirs.remove(path) + false + } + } + } + + val newFiles = mutable.ArrayBuffer[String]() + validDirs.flatMap(fs.listStatus(_)). // Get sub dirs and files + foreach(searchFilesRecursively(_, newFiles)) val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) @@ -193,7 +246,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( "files in the monitored directory." ) } - newFiles + newFiles.toArray } catch { case e: Exception => logWarning("Error finding new files", e) @@ -276,17 +329,32 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( } private def directoryPath: Path = { - if (path_ == null) path_ = new Path(directory) + if (fs_ == null) init() path_ } + private def directoryDepth: Int = { + if (fs_ == null) init() + directoryDepth_ + } + private def fs: FileSystem = { - if (fs_ == null) fs_ = directoryPath.getFileSystem(ssc.sparkContext.hadoopConfiguration) + if (fs_ == null) init() fs_ } - private def reset() { + private def init(): Unit = { + val originPath = new Path(directory) + fs_ = originPath.getFileSystem(ssc.sparkContext.hadoopConfiguration) + // Get the absolute path + path_ = fs_.getFileStatus(originPath).getPath + directoryDepth_ = path_.depth() + } + + private def reset() { fs_ = null + path_ = null + directoryDepth_ = -1 } @throws(classOf[IOException]) @@ -298,6 +366,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) + lastFoundDirs = new mutable.HashSet[Path]() } /** diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b74d67c63a78..9ef2fa7a14df 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -188,14 +188,42 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("file input stream - newFilesOnly = true") { + test("file input stream - newFilesOnly = true and depth = 1") { testFileStream(newFilesOnly = true) } - test("file input stream - newFilesOnly = false") { + test("file input stream - newFilesOnly = false and depth = 1") { testFileStream(newFilesOnly = false) } + test("file input stream - newFilesOnly = true and depth = 2") { + testFileStream(newFilesOnly = true, 2) + } + + test("file input stream - newFilesOnly = false and depth = 2") { + testFileStream(newFilesOnly = false, 2) + } + + test("file input stream - newFilesOnly = true and depth = 3") { + testFileStream(newFilesOnly = true, 3) + } + + test("file input stream - newFilesOnly = false and depth = 3") { + testFileStream(newFilesOnly = false, 3) + } + + test("file input stream - newFilesOnly = false and depth is too small") { + testFileStream(newFilesOnly = false, 3, 2) + } + + test("file input stream - newFilesOnly = true and depth = Int.MaxValue") { + testFileStream(newFilesOnly = true, 3, Int.MaxValue) + } + + test("file input stream - newFilesOnly = false and depth = Int.MaxValue") { + testFileStream(newFilesOnly = false, 3, Int.MaxValue) + } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -348,11 +376,18 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(receiverInputStreams.map(_.id) === Array(0, 1)) } - def testFileStream(newFilesOnly: Boolean) { - val testDir: File = null + def testFileStream(newFilesOnly: Boolean, depth: Int = 1): Unit = { + testFileStream(newFilesOnly, depth, depth) + } + + def testFileStream(newFilesOnly: Boolean, createDepth: Int, searchDepth: Int) { + val rootDir = Utils.createTempDir() try { val batchDuration = Seconds(2) - val testDir = Utils.createTempDir() + var testDir = rootDir + for (i <- 1 until createDepth) { + testDir = Utils.createTempDir(testDir.toString) + } // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) @@ -365,7 +400,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { clock.setTime(existingFile.lastModified + batchDuration.milliseconds) val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( - testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + rootDir.toString, (x: Path) => true, + newFilesOnly = newFilesOnly, searchDepth).map(_._2.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(fileStream, outputBuffer) outputStream.register() @@ -392,15 +428,18 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } // Verify that all the files have been read - val expectedOutput = if (newFilesOnly) { - input.map(_.toString).toSet - } else { - (Seq(0) ++ input).map(_.toString).toSet - } + val expectedOutput = + if (createDepth > searchDepth) { + Set() + } else if (newFilesOnly) { + input.map(_.toString).toSet + } else { + (Seq(0) ++ input).map(_.toString).toSet + } assert(outputBuffer.flatten.toSet === expectedOutput) } } finally { - if (testDir != null) Utils.deleteRecursively(testDir) + Utils.deleteRecursively(rootDir) } } }