diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index bd863d48d53e3..2df1ed77b86ae 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -653,8 +653,7 @@ methods for creating DStreams from files and Akka actors as input sources. - Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). Note that - + Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory (files written in nested directories not supported). It can also monitor files in subdirectories by setting the optional `depth` parameter to a value greater than 1. Note that + The files must have the same data format. + The files must be created in the `dataDirectory` by atomically *moving* or *renaming* them into the data directory. diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index ac5ba69e8dbbb..867fdab00d447 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -260,7 +260,13 @@ def textFileStream(self, directory): monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored. """ - return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + return textFileStream(self, directory, 1) + + def textFileStream(self, directory, depth): + """ + Create an input stream that monitor files in subdirectories. + """ + return DStream(self._jssc.textFileStream(directory, depth), self, UTF8Deserializer()) def binaryRecordsStream(self, directory, recordLength): """ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 7f181bcecd4bf..894ae676565c1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -376,6 +376,27 @@ class StreamingContext private[streaming] ( new FileInputDStream[K, V, F](this, directory) } + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, depth: Int): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth) + } + /** * Create a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. @@ -393,7 +414,33 @@ class StreamingContext private[streaming] ( V: ClassTag, F <: NewInputFormat[K, V]: ClassTag ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + depth: Int): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly) } /** @@ -417,7 +464,35 @@ class StreamingContext private[streaming] ( filter: Path => Boolean, newFilesOnly: Boolean, conf: Configuration): InputDStream[(K, V)] = { - new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf)) + new FileInputDStream[K, V, F](this, directory, 1, filter, newFilesOnly, Option(conf)) + } + + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassTag, + V: ClassTag, + F <: NewInputFormat[K, V]: ClassTag + ] (directory: String, + filter: Path => Boolean, + newFilesOnly: Boolean, + conf: Configuration, + depth: Int): InputDStream[(K, V)] = { + new FileInputDStream[K, V, F](this, directory, depth, filter, newFilesOnly, Option(conf)) } /** @@ -428,10 +503,27 @@ class StreamingContext private[streaming] ( * file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ - def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { + def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") { fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } + /** + * Create a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). Files must be written to the + * monitored directory by "moving" them from another location within the same + * file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + */ + def textFileStream( + directory: String, + depth: Int): DStream[String] = withNamedScope("text file stream") { + fileStream[LongWritable, Text, TextInputFormat](directory, depth).map(_._2.toString) + } + /** * :: Experimental :: * @@ -440,21 +532,26 @@ class StreamingContext private[streaming] ( * generating one byte array per record. Files must be written to the monitored directory * by "moving" them from another location within the same file system. File names * starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. * * '''Note:''' We ensure that the byte array for each record in the * resulting RDDs of the DStream has the provided record length. * * @param directory HDFS directory to monitor for new file * @param recordLength length of each record in bytes + * @param depth Searching depth of HDFS directory */ @Experimental def binaryRecordsStream( directory: String, - recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { + recordLength: Int, + depth: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { val conf = sc_.hadoopConfiguration conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat]( - directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf) + directory, FileInputDStream.defaultFilter : Path => Boolean, + newFilesOnly=true, conf, depth) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") @@ -463,6 +560,28 @@ class StreamingContext private[streaming] ( data } + /** + * :: Experimental :: + * + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as flat binary files, assuming a fixed length per record, + * generating one byte array per record. Files must be written to the monitored directory + * by "moving" them from another location within the same file system. File names + * starting with . are ignored. + * + * '''Note:''' We ensure that the byte array for each record in the + * resulting RDDs of the DStream has the provided record length. + * + * @param directory HDFS directory to monitor for new file + * @param recordLength length of each record in bytes + */ + @Experimental + def binaryRecordsStream( + directory: String, + recordLength: Int): DStream[Array[Byte]] = withNamedScope("binary records stream") { + binaryRecordsStream(directory, recordLength, 1) + } + /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index b639b94d5ca47..1a89b6bbc7e66 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -211,6 +211,15 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.textFileStream(directory) } + /** + * Create an input stream that monitor files in subdirectories for new files + * and reads them as text files. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + */ + def textFileStream(directory: String, depth: Int): JavaDStream[String] = { + ssc.textFileStream(directory,depth) + } /** * :: Experimental :: * @@ -289,6 +298,34 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param depth Searching depth of HDFS directory + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + depth: Int, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F]): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + ssc.fileStream[K, V, F](directory, depth) + } + /** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. @@ -318,6 +355,39 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + depth: Int): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, depth) + } + /** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. @@ -349,6 +419,41 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) } + /** + * Create an input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * Files must be written to the monitored directory by "moving" them from another + * location within the same file system. File names starting with . are ignored. + * It can also monitor files in subdirectories by setting the optional `depth` + * parameter to a value greater than 1. + * @param directory HDFS directory to monitor for new file + * @param kClass class of key for reading HDFS file + * @param vClass class of value for reading HDFS file + * @param fClass class of input format for reading HDFS file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @param conf Hadoop configuration + * @param depth Searching depth of HDFS directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]]( + directory: String, + kClass: Class[K], + vClass: Class[V], + fClass: Class[F], + filter: JFunction[Path, JBoolean], + newFilesOnly: Boolean, + conf: Configuration, + depth: Int): JavaPairInputDStream[K, V] = { + implicit val cmk: ClassTag[K] = ClassTag(kClass) + implicit val cmv: ClassTag[V] = ClassTag(vClass) + implicit val cmf: ClassTag[F] = ClassTag(fClass) + def fn: (Path) => Boolean = (x: Path) => filter.call(x).booleanValue() + ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf, depth) + } + /** * Create an input stream with any arbitrary user implemented actor receiver. * @param props Props object defining creation of the actor diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index eca69f00188e4..c4e8ccb4d794f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -17,27 +17,27 @@ package org.apache.spark.streaming.dstream -import java.io.{IOException, ObjectInputStream} - -import scala.collection.mutable -import scala.reflect.ClassTag +import java.io.{FileNotFoundException, IOException, ObjectInputStream} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} - -import org.apache.spark.{SparkConf, SerializableWritable} +import org.apache.spark.SerializableWritable import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} +import scala.collection.mutable +import scala.reflect.ClassTag + /** * This class represents an input stream that monitors a Hadoop-compatible filesystem for new * files and creates a stream out of them. The way it works as follows. * * At each batch interval, the file system is queried for files in the given directory and - * detected new files are selected for that batch. In this case "new" means files that - * became visible to readers during that time period. Some extra care is needed to deal + * detected new files are selected for that batch. It can also monitor files in subdirectories by + * setting the optional `depth` parameter to a value greater than 1. In this case "new" means + * files that became visible to readers during that time period. Some extra care is needed to deal * with the fact that files may become visible after they are created. For this purpose, this * class remembers the information about the files selected in past batches for * a certain duration (say, "remember window") as shown in the figure below. @@ -72,6 +72,7 @@ private[streaming] class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( @transient ssc_ : StreamingContext, directory: String, + depth: Int = 1, filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true, conf: Option[Configuration] = None) @@ -92,6 +93,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock + require(depth >= 1, "nested directories depth must >= 1") // Data to be saved as part of the streaming checkpoints protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData @@ -117,6 +119,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() + // Set of directories that were found from the beginning to the present + @transient private var lastFoundDirs = new mutable.HashSet[Path]() + // Read-through cache of file mod times, used to speed up mod time lookups @transient private var fileToModTime = new TimeStampedHashMap[String, Long](true) @@ -163,7 +168,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( } /** - * Find new files for the batch of `currentTime`. This is done by first calculating the + * Find new files for the batch of `currentTime` in nested directories. + * This is done by first calculating the * ignore threshold for file mod times, and then getting a list of files filtered based on * the current batch time and the ignore threshold. The ignore threshold is the max of * initial ignore threshold and the trailing end of the remember window (that is, which ever @@ -183,8 +189,46 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( val filter = new PathFilter { def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) } - val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) - val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime + val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth() + + // Nested directories to find new files. + def dfs(status: FileStatus): List[FileStatus] = { + val path = status.getPath + val depthFilter = depth + directoryDepth - path.depth() + if (status.isDir) { + if (depthFilter - 1 >= 0) { + if (lastFoundDirs.contains(path)) { + if (status.getModificationTime > modTimeIgnoreThreshold) { + fs.listStatus(path).toList.flatMap(dfs(_)) + } else Nil + } else { + lastFoundDirs += path + fs.listStatus(path).toList.flatMap(dfs(_)) + } + } else Nil + } else { + if (filter.accept(path)) status :: Nil else Nil + } + } + + val path = if (lastFoundDirs.isEmpty) Seq(fs.getFileStatus(directoryPath)) + else { + lastFoundDirs.filter { path => + // If the mod time of directory is more than ignore time, no new files in this directory. + try { + val status = fs.getFileStatus(path) + status != null && status.getModificationTime > modTimeIgnoreThreshold + } catch { + // If the directory don't find, remove the directory from `lastFoundDirs` + case e: FileNotFoundException => + lastFoundDirs.remove(path) + false + } + } + }.flatMap(fs.listStatus(_)).toSeq + + val newFiles = path.flatMap(dfs(_)).map(_.getPath.toString).toArray + val timeTaken = System.currentTimeMillis - lastNewFileFindingTime logInfo("Finding new files took " + timeTaken + " ms") logDebug("# cached file times = " + fileToModTime.size) if (timeTaken > slideDuration.milliseconds) { @@ -202,7 +246,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( Array.empty } } - /** * Identify whether the given `path` is a new file for the batch of `currentTime`. For it to be * accepted, it has to pass the following criteria. @@ -223,6 +266,11 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( */ private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: Long): Boolean = { val pathStr = path.toString + // Reject file if it start with _ + if (path.getName().startsWith("_")) { + logDebug(s"startsWith: ${path.getName()}") + return false + } // Reject file if it does not satisfy filter if (!filter(path)) { logDebug(s"$pathStr rejected by filter") @@ -299,6 +347,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) + lastFoundDirs = new mutable.HashSet[Path]() } /** @@ -341,7 +390,7 @@ private[streaming] object FileInputDStream { def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") - + /** * Calculate the number of last batches to remember, such that all the files selected in * at least last minRememberDurationS duration can be remembered. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 93e6b0cd7c661..d69b568db16f7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -157,14 +157,30 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } - test("file input stream - newFilesOnly = true") { + test("file input stream - newFilesOnly = true and depth = 1") { testFileStream(newFilesOnly = true) } - test("file input stream - newFilesOnly = false") { + test("file input stream - newFilesOnly = false and depth = 1") { testFileStream(newFilesOnly = false) } + test("file input stream - newFilesOnly = true and depth = 2") { + testFileStream(newFilesOnly = true, 2) + } + + test("file input stream - newFilesOnly = false and depth = 2") { + testFileStream(newFilesOnly = false, 2) + } + + test("file input stream - newFilesOnly = true and depth = 3") { + testFileStream(newFilesOnly = true, 3) + } + + test("file input stream - newFilesOnly = false and depth = 3") { + testFileStream(newFilesOnly = false, 3) + } + test("multi-thread receiver") { // set up the test receiver val numThreads = 10 @@ -293,6 +309,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } } + test("test track the number of input stream") { val ssc = new StreamingContext(conf, batchDuration) @@ -317,11 +334,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(receiverInputStreams.map(_.id) === Array(0, 1)) } - def testFileStream(newFilesOnly: Boolean) { + + def testFileStream(newFilesOnly: Boolean, depth :Int = 1) { val testDir: File = null try { val batchDuration = Seconds(2) - val testDir = Utils.createTempDir() + var testDir = Utils.createTempDir() + for (i <- 2 until depth) { + testDir = Utils.createTempDir(testDir.toString) + } // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") Files.write("0\n", existingFile, Charset.forName("UTF-8")) @@ -334,7 +355,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { clock.setTime(existingFile.lastModified + batchDuration.milliseconds) val batchCounter = new BatchCounter(ssc) val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat]( - testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString) + testDir.toString, (x: Path) => true, + newFilesOnly = newFilesOnly, depth).map(_._2.toString) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(fileStream, outputBuffer) outputStream.register()