From 24e75ae7070f82ec847c144a5ba4940736d95503 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 1 Feb 2016 23:13:50 -0800 Subject: [PATCH 1/5] [SPARK-13186][Streaming]Migrate away from SynchronizedMap --- .../apache/spark/streaming/kafka/KafkaStreamSuite.scala | 5 ++++- .../spark/streaming/kinesis/KinesisStreamSuite.scala | 5 +++-- .../apache/spark/streaming/dstream/FileInputDStream.scala | 8 ++++---- .../apache/spark/streaming/StreamingListenerSuite.scala | 6 ++++-- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 797b07f80d8ee..c2bb3e760a83c 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -30,6 +30,9 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.convert.decorateAsScala._ + class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { private var ssc: StreamingContext = _ private var kafkaTestUtils: KafkaTestUtils = _ @@ -65,7 +68,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] + val result = new ConcurrentHashMap[String, Long].asScala stream.map(_._2).countByValue().foreachRDD { r => val ret = r.collect() ret.toMap.foreach { kv => diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ee6a5f0390d04..f7dc701f91321 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -39,6 +39,8 @@ import org.apache.spark.streaming.kinesis.KinesisTestUtils._ import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler.ReceivedBlockInfo import org.apache.spark.util.Utils +import java.util.concurrent.ConcurrentHashMap +import scala.collection.convert.decorateAsScala._ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { @@ -229,8 +231,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.checkpoint(checkpointDir) val awsCredentials = KinesisTestUtils.getAWSCredentials() - val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] - with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])] + val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])].asScala val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1c2325409b53e..8fdfc121ea382 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -30,6 +30,8 @@ import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.convert.decorateAsScala._ /** * This class represents an input stream that monitors a Hadoop-compatible filesystem for new @@ -116,8 +118,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Map of batch-time to selected file info for the remembered batches // This is a concurrent map because it's also accessed in unit tests - @transient private[streaming] var batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] + @transient private[streaming] var batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -307,8 +308,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() - batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] + batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 1ed68c74db9fd..340a00e7e87f4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming -import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap} +import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -32,6 +32,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ +import java.util.concurrent.ConcurrentHashMap +import scala.collection.convert.decorateAsScala._ class StreamingListenerSuite extends TestSuiteBase with Matchers { @@ -357,7 +359,7 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O */ class FailureReasonsCollector extends StreamingListener { - val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String] + val failureReasons = new java.util.concurrent.ConcurrentHashMap[Int, String].asScala override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { From 782020b2f9809f31a47b3a27851ea33ad3ad2634 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 7 Feb 2016 23:11:14 -0800 Subject: [PATCH 2/5] fix import ordering and concurrency issue --- .../streaming/kafka/KafkaStreamSuite.scala | 13 +++++++------ .../streaming/kinesis/KinesisStreamSuite.scala | 15 ++++++++------- .../streaming/dstream/FileInputDStream.scala | 18 +++++++++--------- .../spark/streaming/CheckpointSuite.scala | 3 ++- .../streaming/StreamingListenerSuite.scala | 13 +++++++------ 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index c2bb3e760a83c..16a6d3eb3a489 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.streaming.kafka -import scala.collection.mutable +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.convert.decorateAsScala._ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random @@ -30,9 +32,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ - class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll { private var ssc: StreamingContext = _ private var kafkaTestUtils: KafkaTestUtils = _ @@ -72,8 +71,10 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter stream.map(_._2).countByValue().foreachRDD { r => val ret = r.collect() ret.toMap.foreach { kv => - val count = result.getOrElseUpdate(kv._1, 0) + kv._2 - result.put(kv._1, count) + result.synchronized { + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 + result.put(kv._1, count) + } } } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index f7dc701f91321..2348941a3ad19 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.streaming.kinesis +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.convert.decorateAsScala._ import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps @@ -39,8 +42,6 @@ import org.apache.spark.streaming.kinesis.KinesisTestUtils._ import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult import org.apache.spark.streaming.scheduler.ReceivedBlockInfo import org.apache.spark.util.Utils -import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { @@ -231,7 +232,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.checkpoint(checkpointDir) val awsCredentials = KinesisTestUtils.getAWSCredentials() - val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])].asScala + val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, @@ -242,13 +243,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq - collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) + collectedData.put(time,(kRdd.arrayOfseqNumberRanges, data)) }) ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint ssc.start() - def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty) + def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty) def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty @@ -269,9 +270,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges // and return the same data - val times = collectedData.keySet + val times = collectedData.asScala.keySet times.foreach { time => - val (arrayOfSeqNumRanges, data) = collectedData(time) + val (arrayOfSeqNumRanges, data) = collectedData.get(time) val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] rdd shouldBe a [KinesisBackedBlockRDD[_]] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8fdfc121ea382..239a866278657 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,7 +18,9 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.convert.decorateAsScala._ import scala.collection.mutable import scala.reflect.ClassTag @@ -30,8 +32,6 @@ import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.scheduler.StreamInputInfo import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils} -import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ /** * This class represents an input stream that monitors a Hadoop-compatible filesystem for new @@ -118,7 +118,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Map of batch-time to selected file info for the remembered batches // This is a concurrent map because it's also accessed in unit tests - @transient private[streaming] var batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala + @transient private[streaming] var batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]] // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -149,7 +149,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Find new files val newFiles = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) - batchTimeToSelectedFiles += ((validTime, newFiles)) + batchTimeToSelectedFiles.put(validTime, newFiles) recentlySelectedFiles ++= newFiles val rdds = Some(filesToRDD(newFiles)) // Copy newFiles to immutable.List to prevent from being modified by the user @@ -164,8 +164,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( /** Clear the old time-to-files mappings along with old RDDs */ protected[streaming] override def clearMetadata(time: Time) { super.clearMetadata(time) - val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) - batchTimeToSelectedFiles --= oldFiles.keys + val oldFiles = batchTimeToSelectedFiles.asScala.filter(_._1 < (time - rememberDuration)) + oldFiles.keys.foreach(key => batchTimeToSelectedFiles.remove(key)) recentlySelectedFiles --= oldFiles.values.flatten logInfo("Cleared " + oldFiles.size + " old files that were older than " + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) @@ -308,7 +308,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() - batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]].asScala + batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } @@ -324,7 +324,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( override def update(time: Time) { hadoopFiles.clear() - hadoopFiles ++= batchTimeToSelectedFiles + hadoopFiles ++= batchTimeToSelectedFiles.asScala } override def cleanup(time: Time) { } @@ -335,7 +335,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + f.mkString("[", ", ", "]") ) - batchTimeToSelectedFiles += ((t, f)) + batchTimeToSelectedFiles.put(t, f) recentlySelectedFiles ++= f generatedRDDs += ((t, filesToRDD(f))) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 786703eb9a84e..c805c926842db 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import scala.collection.convert.decorateAsScala._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -612,7 +613,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester def recordedFiles(ssc: StreamingContext): Seq[Int] = { val fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten + val filenames = fileInputDStream.batchTimeToSelectedFiles.asScala.values.flatten filenames.map(_.split(File.separator).last.toInt).toSeq.sorted } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 340a00e7e87f4..27ee1d980b9c0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.streaming -import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.convert.decorateAsScala._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -32,8 +35,6 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler._ -import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ class StreamingListenerSuite extends TestSuiteBase with Matchers { @@ -269,7 +270,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } _ssc.stop() - failureReasonsCollector.failureReasons.toMap + failureReasonsCollector.failureReasons.asScala.toMap } /** Check if a sequence of numbers is in increasing order */ @@ -359,12 +360,12 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O */ class FailureReasonsCollector extends StreamingListener { - val failureReasons = new java.util.concurrent.ConcurrentHashMap[Int, String].asScala + val failureReasons = new ConcurrentHashMap[Int, String] override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { outputOperationCompleted.outputOperationInfo.failureReason.foreach { f => - failureReasons(outputOperationCompleted.outputOperationInfo.id) = f + failureReasons.put(outputOperationCompleted.outputOperationInfo.id, f) } } } From a56f280fdc7367c2cfdd2a6085320a4112b8a461 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 8 Feb 2016 03:54:43 -0800 Subject: [PATCH 3/5] fix import and concurrency issue --- .../spark/streaming/kafka/KafkaStreamSuite.scala | 11 ++++------- .../spark/streaming/kinesis/KinesisStreamSuite.scala | 2 +- .../spark/streaming/dstream/FileInputDStream.scala | 2 +- .../org/apache/spark/streaming/CheckpointSuite.scala | 2 +- .../spark/streaming/StreamingListenerSuite.scala | 2 +- 5 files changed, 8 insertions(+), 11 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 16a6d3eb3a489..6a35ac14a8f6f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -17,9 +17,7 @@ package org.apache.spark.streaming.kafka -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.convert.decorateAsScala._ +import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random @@ -67,10 +65,9 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new ConcurrentHashMap[String, Long].asScala + val result = new mutable.HashMap[String, Long]() stream.map(_._2).countByValue().foreachRDD { r => - val ret = r.collect() - ret.toMap.foreach { kv => + r.collect().foreach { kv => result.synchronized { val count = result.getOrElseUpdate(kv._1, 0) + kv._2 result.put(kv._1, count) @@ -81,7 +78,7 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter ssc.start() eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sent === result) + assert(result.synchronized { sent === result }) } } } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 2348941a3ad19..22d3fc9b5225f 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.kinesis import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 239a866278657..b801fb58d864f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index c805c926842db..bc2ad9b0fc186 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} -import scala.collection.convert.decorateAsScala._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 27ee1d980b9c0..dad7c16ebfbae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import java.util.concurrent.ConcurrentHashMap -import scala.collection.convert.decorateAsScala._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future From 5668a79bcb72c2de34a1d2aa7d95984e7d8bf0d2 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 8 Feb 2016 04:58:17 -0800 Subject: [PATCH 4/5] fix file line length problem --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b801fb58d864f..5e97973a1b0c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -118,7 +118,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Map of batch-time to selected file info for the remembered batches // This is a concurrent map because it's also accessed in unit tests - @transient private[streaming] var batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]] + @transient private[streaming] var batchTimeToSelectedFiles = + new ConcurrentHashMap[Time, Array[String]] // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() From cde889e6bea3ee30c4b6108471048a6bec3624e2 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 9 Feb 2016 07:04:21 -0800 Subject: [PATCH 5/5] fix style error --- .../org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 22d3fc9b5225f..e873fb60434e7 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -243,7 +243,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq - collectedData.put(time,(kRdd.arrayOfseqNumberRanges, data)) + collectedData.put(time, (kRdd.arrayOfseqNumberRanges, data)) }) ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint