Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,20 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long]
val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
r.collect().foreach { kv =>
result.synchronized {
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think putIfAbsent on the underlying Java type might do what you are looking for here. https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html#putIfAbsent(K,%20V)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk
Thanks for your quick reply.
I initially changed
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
to
result.putIfAbsent(kv._1, 0)
val count = result.get(kv._1) + kv.2
but the test failed for me. I guess a different thread can come in between of the two lines and the concurrency is not guaranteed any more. So I used synchronized block instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you would probably want to try val count = result.putIfAbsent(kv.1, 0) + kv._2 - although looking at the original code it had a race condition. If were going to put a synchronized block around the update we could just use a regular mutable.HashMap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for using synchronized + mutable.HashMap. In addition, toMap in ret.toMap.foreach can be removed. Hence I would recommend changing codes to

    val result = new mutable.HashMap[String, Long]()
    stream.map(_._2).countByValue().foreachRDD { r =>
      r.collect().foreach { kv =>
        result.synchronized {
          val count = result.getOrElseUpdate(kv._1, 0) + kv._2
          result.put(kv._1, count)
        }
      }
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also change assert to assert(result.synchronized { sent === result })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk @zsxwing
I tried _val count = result.putIfAbsent(kv.1, 0) + kv.2, but the test failed for me. So I will change to mutable.HashMap and put in synchronized block.
Is it OK to use mutable.HashMap and synchronized block in this file only, but use java.util.concurrent.ConcurrentHashMap in other files(StreamingListenerSuite, KinesisStreamTests and FileInputDStream)? Or is it better to to use mutable.HashMap and synchronized block for all the files that has SynchronizedMap?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ConcurrentHashMap in other files looks fine. I don't see any potential issues.

result.put(kv._1, count)
}
}
}

ssc.start()

eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent === result)
assert(result.synchronized { sent === result })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.streaming.kinesis

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -229,8 +232,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.checkpoint(checkpointDir)

val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
val collectedData = new ConcurrentHashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]

val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Expand All @@ -241,13 +243,13 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD[Array[Byte]]]
val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
collectedData.put(time, (kRdd.arrayOfseqNumberRanges, data))
})

ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
ssc.start()

def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
def numBatchesWithData: Int = collectedData.asScala.count(_._2._2.nonEmpty)

def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty

Expand All @@ -268,9 +270,9 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun

// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
// and return the same data
val times = collectedData.keySet
val times = collectedData.asScala.keySet
times.foreach { time =>
val (arrayOfSeqNumRanges, data) = collectedData(time)
val (arrayOfSeqNumRanges, data) = collectedData.get(time)
val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
rdd shouldBe a [KinesisBackedBlockRDD[_]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.streaming.dstream

import java.io.{IOException, ObjectInputStream}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.reflect.ClassTag

Expand Down Expand Up @@ -117,7 +119,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Map of batch-time to selected file info for the remembered batches
// This is a concurrent map because it's also accessed in unit tests
@transient private[streaming] var batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
new ConcurrentHashMap[Time, Array[String]]

// Set of files that were selected in the remembered batches
@transient private var recentlySelectedFiles = new mutable.HashSet[String]()
Expand Down Expand Up @@ -148,7 +150,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Find new files
val newFiles = findNewFiles(validTime.milliseconds)
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
batchTimeToSelectedFiles += ((validTime, newFiles))
batchTimeToSelectedFiles.put(validTime, newFiles)
recentlySelectedFiles ++= newFiles
val rdds = Some(filesToRDD(newFiles))
// Copy newFiles to immutable.List to prevent from being modified by the user
Expand All @@ -163,8 +165,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
/** Clear the old time-to-files mappings along with old RDDs */
protected[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration))
batchTimeToSelectedFiles --= oldFiles.keys
val oldFiles = batchTimeToSelectedFiles.asScala.filter(_._1 < (time - rememberDuration))
oldFiles.keys.foreach(key => batchTimeToSelectedFiles.remove(key))
recentlySelectedFiles --= oldFiles.values.flatten
logInfo("Cleared " + oldFiles.size + " old files that were older than " +
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
Expand Down Expand Up @@ -307,8 +309,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
batchTimeToSelectedFiles = new ConcurrentHashMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()
fileToModTime = new TimeStampedHashMap[String, Long](true)
}
Expand All @@ -324,7 +325,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](

override def update(time: Time) {
hadoopFiles.clear()
hadoopFiles ++= batchTimeToSelectedFiles
hadoopFiles ++= batchTimeToSelectedFiles.asScala
}

override def cleanup(time: Time) { }
Expand All @@ -335,7 +336,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
// Restore the metadata in both files and generatedRDDs
logInfo("Restoring files for time " + t + " - " +
f.mkString("[", ", ", "]") )
batchTimeToSelectedFiles += ((t, f))
batchTimeToSelectedFiles.put(t, f)
recentlySelectedFiles ++= f
generatedRDDs += ((t, filesToRDD(f)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.streaming

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag

Expand Down Expand Up @@ -612,7 +613,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
def recordedFiles(ssc: StreamingContext): Seq[Int] = {
val fileInputDStream =
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
val filenames = fileInputDStream.batchTimeToSelectedFiles.values.flatten
val filenames = fileInputDStream.batchTimeToSelectedFiles.asScala.values.flatten
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.streaming

import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

Expand Down Expand Up @@ -267,7 +270,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
_ssc.stop()
failureReasonsCollector.failureReasons.toMap
failureReasonsCollector.failureReasons.asScala.toMap
}

/** Check if a sequence of numbers is in increasing order */
Expand Down Expand Up @@ -357,12 +360,12 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
*/
class FailureReasonsCollector extends StreamingListener {

val failureReasons = new HashMap[Int, String] with SynchronizedMap[Int, String]
val failureReasons = new ConcurrentHashMap[Int, String]

override def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
outputOperationCompleted.outputOperationInfo.failureReason.foreach { f =>
failureReasons(outputOperationCompleted.outputOperationInfo.id) = f
failureReasons.put(outputOperationCompleted.outputOperationInfo.id, f)
}
}
}
Expand Down