diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 797b07f80d8ee..6a35ac14a8f6f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] + val result = new mutable.HashMap[String, Long]() stream.map(_._2).countByValue().foreachRDD { r => - val ret = r.collect() - ret.toMap.foreach { kv => - val count = result.getOrElseUpdate(kv._1, 0) + kv._2 - result.put(kv._1, count) + r.collect().foreach { kv => + result.synchronized { + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 + result.put(kv._1, count) + } } } ssc.start() eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { - assert(sent === result) + assert(result.synchronized { sent === result }) } } } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index ee6a5f0390d04..e873fb60434e7 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.streaming.kinesis +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps @@ -229,8 +232,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.checkpoint(checkpointDir) val awsCredentials = KinesisTestUtils.getAWSCredentials() - val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] - with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])] + val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, @@ -241,13 +243,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]] val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq - collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) + collectedData.put(time, (kRdd.arrayOfseqNumberRanges, data)) }) ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint ssc.start() - def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty) + def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty) def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty @@ -268,9 +270,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges // and return the same data - val times = collectedData.keySet + val times = collectedData.asScala.keySet times.foreach { time => - val (arrayOfSeqNumRanges, data) = collectedData(time) + val (arrayOfSeqNumRanges, data) = collectedData.get(time) val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] rdd shouldBe a [KinesisBackedBlockRDD[_]] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1c2325409b53e..5e97973a1b0c5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -18,7 +18,9 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag @@ -117,7 +119,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Map of batch-time to selected file info for the remembered batches // This is a concurrent map because it's also accessed in unit tests @transient private[streaming] var batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] + new ConcurrentHashMap[Time, Array[String]] // Set of files that were selected in the remembered batches @transient private var recentlySelectedFiles = new mutable.HashSet[String]() @@ -148,7 +150,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Find new files val newFiles = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) - batchTimeToSelectedFiles += ((validTime, newFiles)) + batchTimeToSelectedFiles.put(validTime, newFiles) recentlySelectedFiles ++= newFiles val rdds = Some(filesToRDD(newFiles)) // Copy newFiles to immutable.List to prevent from being modified by the user @@ -163,8 +165,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( /** Clear the old time-to-files mappings along with old RDDs */ protected[streaming] override def clearMetadata(time: Time) { super.clearMetadata(time) - val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) - batchTimeToSelectedFiles --= oldFiles.keys + val oldFiles = batchTimeToSelectedFiles.asScala.filter(_._1 < (time - rememberDuration)) + oldFiles.keys.foreach(key => batchTimeToSelectedFiles.remove(key)) recentlySelectedFiles --= oldFiles.values.flatten logInfo("Cleared " + oldFiles.size + " old files that were older than " + (time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) @@ -307,8 +309,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]() - batchTimeToSelectedFiles = - new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]] + batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]] recentlySelectedFiles = new mutable.HashSet[String]() fileToModTime = new TimeStampedHashMap[String, Long](true) } @@ -324,7 +325,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( override def update(time: Time) { hadoopFiles.clear() - hadoopFiles ++= batchTimeToSelectedFiles + hadoopFiles ++= batchTimeToSelectedFiles.asScala } override def cleanup(time: Time) { } @@ -335,7 +336,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( // Restore the metadata in both files and generatedRDDs logInfo("Restoring files for time " + t + " - " + f.mkString("[", ", ", "]") ) - batchTimeToSelectedFiles += ((t, f)) + batchTimeToSelectedFiles.put(t, f) recentlySelectedFiles ++= f generatedRDDs += ((t, filesToRDD(f))) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 786703eb9a84e..bc2ad9b0fc186 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -612,7 +613,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester def recordedFiles(ssc: StreamingContext): Seq[Int] = { val fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]] - val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten + val filenames = fileInputDStream.batchTimeToSelectedFiles.asScala.values.flatten filenames.map(_.split(File.separator).last.toInt).toSeq.sorted } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 1ed68c74db9fd..dad7c16ebfbae 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.streaming -import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -267,7 +270,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } _ssc.stop() - failureReasonsCollector.failureReasons.toMap + failureReasonsCollector.failureReasons.asScala.toMap } /** Check if a sequence of numbers is in increasing order */ @@ -357,12 +360,12 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O */ class FailureReasonsCollector extends StreamingListener { - val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String] + val failureReasons = new ConcurrentHashMap[Int, String] override def onOutputOperationCompleted( outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { outputOperationCompleted.outputOperationInfo.failureReason.foreach { f => - failureReasons(outputOperationCompleted.outputOperationInfo.id) = f + failureReasons.put(outputOperationCompleted.outputOperationInfo.id, f) } } }