From 70805d117632de5bb3b9f478d479be3771bb581e Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Tue, 12 Apr 2016 16:42:02 +0530 Subject: [PATCH 01/15] approach 2 for extension of listener --- .../kafka/DirectKafkaStreamSuite.scala | 169 +++++++++++++++--- .../spark/streaming/scheduler/BatchInfo.scala | 2 + .../streaming/scheduler/JobGenerator.scala | 13 +- .../spark/streaming/scheduler/JobSet.scala | 2 + .../JavaStreamingListenerWrapperSuite.scala | 6 +- .../StreamingJobProgressListenerSuite.scala | 20 +-- 6 files changed, 171 insertions(+), 41 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705fd9..9fdbd893ceaa 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -19,28 +19,26 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.Arrays -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps +import java.util.concurrent.atomic.AtomicLong import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps class DirectKafkaStreamSuite extends SparkFunSuite @@ -117,8 +115,8 @@ class DirectKafkaStreamSuite logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } val collected = rdd.mapPartitionsWithIndex { (i, iter) => - // For each partition, get size of the range in the partition, - // and the number of items in the partition + // For each partition, get size of the range in the partition, + // and the number of items in the partition val off = offsetRanges(i) val all = iter.toSeq val partSize = all.size @@ -413,7 +411,7 @@ class DirectKafkaStreamSuite .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, kafkaParams, m, messageHandler) { + ssc, kafkaParams, m, messageHandler) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) } @@ -438,8 +436,8 @@ class DirectKafkaStreamSuite Seq(100, 50, 20).foreach { rate => collectedData.clear() // Empty this buffer on each pass. estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. @@ -453,7 +451,7 @@ class DirectKafkaStreamSuite /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( - kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { + kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { kafkaStream.generatedRDDs.mapValues { rdd => rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges }.toSeq.sortBy { _._1 } @@ -479,6 +477,133 @@ class DirectKafkaStreamSuite } } + +object DirectKafkaWordCount { + + import org.apache.spark.streaming._ + + def main(args: Array[String]) { + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(6)) + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + + val lines = ssc.socketTextStream("localhost", 9998) + + val words = lines.flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + + val wordCounts = pairs.reduceByKey(_ + _) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.scheduler._ + +class StopContextThread(ssc: StreamingContext) extends Runnable { + def run { + ssc.stop(true, true) + } +} + + +class LatencyListener(ssc: StreamingContext) extends StreamingListener { + + var metricMap: scala.collection.mutable.Map[String, Object] = _ + var startTime = 0L + var startTime1 = 0L + var endTime = 0L + var endTime1 = 0L + var totalDelay = 0L + var hasStarted = false + var batchCount = 0 + var totalRecords = 0L + val thread: Thread = new Thread(new StopContextThread(ssc)) + + + def getMap(): scala.collection.mutable.Map[String, Object] = synchronized { + if (metricMap == null) metricMap = scala.collection.mutable.Map() + metricMap + } + + def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized { + this.metricMap = metricMap + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + val batchInfo = batchCompleted.batchInfo + println("job generate delay ="+batchCompleted.batchInfo.batchJobSetCreationDelay) + + val prevCount = totalRecords + var recordThisBatch = batchInfo.numRecords + if (!thread.isAlive) { + totalRecords += recordThisBatch + val imap = getMap + imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + + ", batch Count so far," + batchCount + + ", total Records so far," + totalRecords + + ", record This Batch," + recordThisBatch + + ", submission Time," + batchInfo.submissionTime + + ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + + ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + + ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + + ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) + + setMap(imap) + } + + if (totalRecords >= 100) { + if (hasStarted && !thread.isAlive) { + //not receiving any data more, finish + endTime = System.currentTimeMillis() + endTime1 = batchInfo.processingEndTime.getOrElse(0L) + var warning="" + val totalTime = (endTime - startTime).toDouble / 1000 + //This is weighted avg of every batch process time. The weight is records processed int the batch + val recordThroughput = totalRecords / totalTime + + val imap = getMap + + imap("Final Metric") = " Total Batch count," + batchCount+ + ", startTime based on submissionTime,"+startTime + + ", startTime based on System,"+startTime1 + + ", endTime based on System,"+endTime + + ", endTime based on processingEndTime,"+endTime1 + + ", Total Records,"+totalRecords+ + // ", Total processing delay = " + totalDelay + " ms "+ + ", Total Consumed time in sec," + totalTime + + ", Avg records/sec," + recordThroughput + + imap.foreach {case (key, value) => println(key + "-->" + value)} + + thread.start + } + } else if (!hasStarted) { + if (batchInfo.numRecords>0) { + startTime = batchCompleted.batchInfo.submissionTime + startTime1 = System.currentTimeMillis() + hasStarted = true + } + } + + if (hasStarted) { + // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms") + batchCompleted.batchInfo.processingDelay match { + case Some(value) => totalDelay += value * recordThisBatch + case None => //Nothing + } + batchCount = batchCount + 1 + } + } + +} object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L @@ -510,14 +635,14 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) } def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(rate) + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(rate) } private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long) extends RateController(id, estimator) { override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate -} +} \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 5b2b959f8138..b46925557c4e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -34,6 +34,7 @@ import org.apache.spark.streaming.Time @DeveloperApi case class BatchInfo( batchTime: Time, + jobSetCreationDelay: Option[Long], streamIdToInputInfo: Map[Int, StreamInputInfo], submissionTime: Long, processingStartTime: Option[Long], @@ -41,6 +42,7 @@ case class BatchInfo( outputOperationInfos: Map[Int, OutputOperationInfo] ) { + def batchJobSetCreationDelay = jobSetCreationDelay.getOrElse(0L) /** * Time taken for the first job of this batch to start processing from the time this batch * was submitted to the streaming scheduler. Essentially, it is diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 307ff1f7ec23..5dfba6795f70 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,15 +17,14 @@ package org.apache.spark.streaming.scheduler -import scala.util.{Failure, Success, Try} - -import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} import org.apache.spark.streaming.util.RecurringTimer +import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils} +import scala.util.{Failure, Success, Try} + /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent @@ -231,7 +230,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // added but not allocated, are dangling in the queue after recovering, we have to allocate // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch - jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) + jobScheduler.submitJobSet(JobSet(time, None, graph.generateJobs(time))) } // Restart the timer @@ -241,6 +240,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { + val jobSetCreationStartTime=clock.getTimeMillis() // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") @@ -249,8 +249,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => + val jobSetCreationEndTime=clock.getTimeMillis() val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) - jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) + jobScheduler.submitJobSet(JobSet(time, Option(jobSetCreationEndTime-jobSetCreationStartTime), jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 0baedaf275d6..7b28c21b9113 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -28,6 +28,7 @@ import org.apache.spark.streaming.Time private[streaming] case class JobSet( time: Time, + jobSetCreationDelay: Option[Long], jobs: Seq[Job], streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) { @@ -63,6 +64,7 @@ case class JobSet( def toBatchInfo: BatchInfo = { BatchInfo( time, + jobSetCreationDelay, streamIdToInputInfo, submissionTime, if (hasStarted) Some(processingStartTime) else None, diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 0295e059f7bc..b538918db749 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -63,7 +63,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo) val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo( - batchTime = Time(1000L), + batchTime = Time(1000L), None, streamIdToInputInfo = Map( 0 -> StreamInputInfo( inputStreamId = 0, @@ -98,7 +98,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo) val batchStarted = StreamingListenerBatchStarted(BatchInfo( - batchTime = Time(1000L), + batchTime = Time(1000L), None, streamIdToInputInfo = Map( 0 -> StreamInputInfo( inputStreamId = 0, @@ -133,7 +133,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo) val batchCompleted = StreamingListenerBatchCompleted(BatchInfo( - batchTime = Time(1000L), + batchTime = Time(1000L), None, streamIdToInputInfo = Map( 0 -> StreamInputInfo( inputStreamId = 0, diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 26b757cc2d53..5f83a9e698ab 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -63,7 +63,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))) // onBatchSubmitted - val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) + val batchInfoSubmitted = BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) @@ -76,7 +76,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchStarted val batchInfoStarted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) @@ -118,7 +118,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) @@ -159,7 +159,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L)) val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) for(_ <- 0 until (limit + 10)) { listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) @@ -177,7 +177,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // fulfill completedBatchInfos for(i <- 0 until limit) { val batchInfoCompleted = BatchInfo( - Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) + Time(1000 + i * 100), None, Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) listener.onJobStart(jobStart) @@ -188,7 +188,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.onJobStart(jobStart) val batchInfoSubmitted = - BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty) + BatchInfo(Time(1000 + limit * 100), None, Map.empty, (1000 + limit * 100), None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // We still can see the info retrieved from onJobStart @@ -205,7 +205,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // A lot of "onBatchCompleted"s happen before "onJobStart" for(i <- limit + 1 to limit * 2) { val batchInfoCompleted = BatchInfo( - Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) + Time(1000 + i * 100), None, Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } @@ -231,12 +231,12 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchSubmitted val batchInfoSubmitted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // onBatchStarted val batchInfoStarted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) // onJobStart @@ -254,7 +254,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } From d174b980a72dfa6ca77cee8d6096c42aed493d96 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:26:03 +0530 Subject: [PATCH 02/15] fixed the example --- .../kafka/DirectKafkaStreamSuite.scala | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 9fdbd893ceaa..86b792c1ad98 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -483,20 +483,39 @@ object DirectKafkaWordCount { import org.apache.spark.streaming._ def main(args: Array[String]) { - - // Create context with 2 second batch interval val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(6)) + + val ssc = new StreamingContext(sparkConf, Seconds(2)) + //ssc.checkpoint(checkPointPath) + val listener = new LatencyListener(ssc) ssc.addStreamingListener(listener) + val kafkaBrokers = "localhost" + val kafkaPort ="9092" + val topic="test" + + val topicsSet = Set(topic) + + val brokerListString = new StringBuilder(); + + brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) + + + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) + System.err.println( + "Trying to connect to Kafka at " + brokerListString.toString()) + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + ssc.checkpoint("/tmp/checkPoint/") + // Create context with 2 second batch interval - val lines = ssc.socketTextStream("localhost", 9998) + //val lines = ssc.socketTextStream("localhost", 9998) - val words = lines.flatMap(_.split(" ")) + val words = messages.map(x => x._2).flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) - val wordCounts = pairs.reduceByKey(_ + _) + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) wordCounts.print() @@ -559,7 +578,7 @@ class LatencyListener(ssc: StreamingContext) extends StreamingListener { setMap(imap) } - if (totalRecords >= 100) { + if (totalRecords >= 10000) { if (hasStarted && !thread.isAlive) { //not receiving any data more, finish endTime = System.currentTimeMillis() From a6b6ffbc448a990fbacdf7c895db4cebeb8375fe Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:43:11 +0530 Subject: [PATCH 03/15] fixed the eaxmple --- .../kafka/DirectKafkaStreamSuite.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 86b792c1ad98..773a727b667b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -478,7 +478,36 @@ class DirectKafkaStreamSuite } -object DirectKafkaWordCount { +object DirectKafkaWordCountLocal { + + import org.apache.spark.streaming._ + + case class Tick(symbol: String, price: Int, ts: Long) + + def main(args: Array[String]) { + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("/tmp/checkpoint") + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + val lines = ssc.socketTextStream("localhost", 9998) + + val words = lines.flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +object DirectKafkaWordCountKafka { import org.apache.spark.streaming._ From a441df2a26a98ddca17a4420b52c771d852d7312 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:48:29 +0530 Subject: [PATCH 04/15] fixed the eaxmple --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 773a727b667b..33d6071ed92b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -482,8 +482,6 @@ object DirectKafkaWordCountLocal { import org.apache.spark.streaming._ - case class Tick(symbol: String, price: Int, ts: Long) - def main(args: Array[String]) { // Create context with 2 second batch interval @@ -525,10 +523,7 @@ object DirectKafkaWordCountKafka { val topicsSet = Set(topic) - val brokerListString = new StringBuilder(); - - brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) - + val brokerListString = kafkaBrokers+":"+kafkaPort val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) System.err.println( @@ -536,9 +531,6 @@ object DirectKafkaWordCountKafka { val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) ssc.checkpoint("/tmp/checkPoint/") - // Create context with 2 second batch interval - - //val lines = ssc.socketTextStream("localhost", 9998) val words = messages.map(x => x._2).flatMap(_.split(" ")) From 186d5af6417f8cc568577ccf36944af549b572eb Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:48:41 +0530 Subject: [PATCH 05/15] fixed the example --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 33d6071ed92b..0924e0717957 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -595,7 +595,6 @@ class LatencyListener(ssc: StreamingContext) extends StreamingListener { ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) - setMap(imap) } From 79c2c633546995aa27996da364792ba41e64ff19 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 17:53:12 +0530 Subject: [PATCH 06/15] removed test code --- .../kafka/DirectKafkaStreamSuite.scala | 208 ++---------------- 1 file changed, 22 insertions(+), 186 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 0924e0717957..f14ff6705fd9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -19,26 +19,28 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.Arrays -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.scalatest.concurrent.Eventually -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps class DirectKafkaStreamSuite extends SparkFunSuite @@ -115,8 +117,8 @@ class DirectKafkaStreamSuite logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } val collected = rdd.mapPartitionsWithIndex { (i, iter) => - // For each partition, get size of the range in the partition, - // and the number of items in the partition + // For each partition, get size of the range in the partition, + // and the number of items in the partition val off = offsetRanges(i) val all = iter.toSeq val partSize = all.size @@ -411,7 +413,7 @@ class DirectKafkaStreamSuite .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, kafkaParams, m, messageHandler) { + ssc, kafkaParams, m, messageHandler) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) } @@ -436,8 +438,8 @@ class DirectKafkaStreamSuite Seq(100, 50, 20).foreach { rate => collectedData.clear() // Empty this buffer on each pass. estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. @@ -451,7 +453,7 @@ class DirectKafkaStreamSuite /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( - kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { + kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { kafkaStream.generatedRDDs.mapValues { rdd => rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges }.toSeq.sortBy { _._1 } @@ -477,172 +479,6 @@ class DirectKafkaStreamSuite } } - -object DirectKafkaWordCountLocal { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - - // Create context with 2 second batch interval - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("/tmp/checkpoint") - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val lines = ssc.socketTextStream("localhost", 9998) - - val words = lines.flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - -object DirectKafkaWordCountKafka { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - - val ssc = new StreamingContext(sparkConf, Seconds(2)) - //ssc.checkpoint(checkPointPath) - - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val kafkaBrokers = "localhost" - val kafkaPort ="9092" - val topic="test" - - val topicsSet = Set(topic) - - val brokerListString = kafkaBrokers+":"+kafkaPort - - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) - System.err.println( - "Trying to connect to Kafka at " + brokerListString.toString()) - val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet) - ssc.checkpoint("/tmp/checkPoint/") - - val words = messages.map(x => x._2).flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.scheduler._ - -class StopContextThread(ssc: StreamingContext) extends Runnable { - def run { - ssc.stop(true, true) - } -} - - -class LatencyListener(ssc: StreamingContext) extends StreamingListener { - - var metricMap: scala.collection.mutable.Map[String, Object] = _ - var startTime = 0L - var startTime1 = 0L - var endTime = 0L - var endTime1 = 0L - var totalDelay = 0L - var hasStarted = false - var batchCount = 0 - var totalRecords = 0L - val thread: Thread = new Thread(new StopContextThread(ssc)) - - - def getMap(): scala.collection.mutable.Map[String, Object] = synchronized { - if (metricMap == null) metricMap = scala.collection.mutable.Map() - metricMap - } - - def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized { - this.metricMap = metricMap - } - - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - val batchInfo = batchCompleted.batchInfo - println("job generate delay ="+batchCompleted.batchInfo.batchJobSetCreationDelay) - - val prevCount = totalRecords - var recordThisBatch = batchInfo.numRecords - if (!thread.isAlive) { - totalRecords += recordThisBatch - val imap = getMap - imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + - ", batch Count so far," + batchCount + - ", total Records so far," + totalRecords + - ", record This Batch," + recordThisBatch + - ", submission Time," + batchInfo.submissionTime + - ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + - ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + - ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + - ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) - setMap(imap) - } - - if (totalRecords >= 10000) { - if (hasStarted && !thread.isAlive) { - //not receiving any data more, finish - endTime = System.currentTimeMillis() - endTime1 = batchInfo.processingEndTime.getOrElse(0L) - var warning="" - val totalTime = (endTime - startTime).toDouble / 1000 - //This is weighted avg of every batch process time. The weight is records processed int the batch - val recordThroughput = totalRecords / totalTime - - val imap = getMap - - imap("Final Metric") = " Total Batch count," + batchCount+ - ", startTime based on submissionTime,"+startTime + - ", startTime based on System,"+startTime1 + - ", endTime based on System,"+endTime + - ", endTime based on processingEndTime,"+endTime1 + - ", Total Records,"+totalRecords+ - // ", Total processing delay = " + totalDelay + " ms "+ - ", Total Consumed time in sec," + totalTime + - ", Avg records/sec," + recordThroughput - - imap.foreach {case (key, value) => println(key + "-->" + value)} - - thread.start - } - } else if (!hasStarted) { - if (batchInfo.numRecords>0) { - startTime = batchCompleted.batchInfo.submissionTime - startTime1 = System.currentTimeMillis() - hasStarted = true - } - } - - if (hasStarted) { - // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms") - batchCompleted.batchInfo.processingDelay match { - case Some(value) => totalDelay += value * recordThisBatch - case None => //Nothing - } - batchCount = batchCount + 1 - } - } - -} object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L @@ -674,14 +510,14 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) } def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(rate) + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(rate) } private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long) extends RateController(id, estimator) { override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate -} \ No newline at end of file +} From 420cb221fc6c92392cd4ffad8727db9ef346d7c7 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Thu, 21 Apr 2016 01:56:23 +0530 Subject: [PATCH 07/15] made changes to capture job level jobCreateDelay metric --- .../kafka/DirectKafkaStreamSuite.scala | 215 +++++++++++++++++- .../apache/spark/streaming/DStreamGraph.scala | 2 + .../api/java/JavaStreamingListener.scala | 1 + .../java/JavaStreamingListenerWrapper.scala | 1 + .../spark/streaming/scheduler/Job.scala | 11 +- .../scheduler/OutputOperationInfo.scala | 1 + .../spark/streaming/ui/BatchUIData.scala | 2 + .../JavaStreamingListenerWrapperSuite.scala | 9 + 8 files changed, 230 insertions(+), 12 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705fd9..289bd4c6e9f1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -19,28 +19,26 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.Arrays -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps +import java.util.concurrent.atomic.AtomicLong import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.streaming.{Seconds, Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps class DirectKafkaStreamSuite extends SparkFunSuite @@ -479,6 +477,201 @@ class DirectKafkaStreamSuite } } +object DirectKafkaWordCountLocal { + + import org.apache.spark.streaming._ + + case class Tick(symbol: String, price: Int, ts: Long) + + def main(args: Array[String]) { + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("/tmp/checkpoint") + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + val lines = ssc.socketTextStream("localhost", 8888) + + val words = lines.flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + + + val wordCountNew = wordCounts.filter(_._1.startsWith("sac")).reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + wordCountNew.print() + + ssc.start() + ssc.awaitTermination() + } +} +object DirectKafkaWordCountKafka { + + import org.apache.spark.streaming._ + + def main(args: Array[String]) { + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + + val ssc = new StreamingContext(sparkConf, Seconds(2)) + //ssc.checkpoint(checkPointPath) + + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + val kafkaBrokers = "localhost" + val kafkaPort ="9092" + val topic="test" + + val topicsSet = Set(topic) + + val brokerListString = new StringBuilder(); + + brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) + + + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) + System.err.println( + "Trying to connect to Kafka at " + brokerListString.toString()) + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + ssc.checkpoint("/tmp/checkPoint/") + // Create context with 2 second batch interval + + //val lines = ssc.socketTextStream("localhost", 9998) + + val words = messages.map(x => x._2).flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + pairs.print() + + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + + + +class StopContextThread(ssc: StreamingContext) extends Runnable { + def run { + ssc.stop(true, true) + } +} + + +class LatencyListener(ssc: StreamingContext) extends StreamingListener { + + var metricMap: scala.collection.mutable.Map[String, Object] = _ + var startTime = 0L + var startTime1 = 0L + var endTime = 0L + var endTime1 = 0L + var totalDelay = 0L + var hasStarted = false + var batchCount = 0 + var totalRecords = 0L + val thread: Thread = new Thread(new StopContextThread(ssc)) + + + def getMap(): scala.collection.mutable.Map[String, Object] = synchronized { + if (metricMap == null) metricMap = scala.collection.mutable.Map() + metricMap + } + + def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized { + this.metricMap = metricMap + } + + /** Called when processing of a job of a batch has started. */ + override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { + println("job creation delay repoted in onOutputOperationStarted ==>"+outputOperationStarted.outputOperationInfo.batchTime+"==>"+outputOperationStarted.outputOperationInfo.id+"==>"+outputOperationStarted.outputOperationInfo.jobGenTime) + } + + val batchSize = Seconds(2).toString + val recordLimitPerThread = 1000 + val loaderThreads = 10 + + val recordLimit = loaderThreads * recordLimitPerThread + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + val batchInfo = batchCompleted.batchInfo + val prevCount = totalRecords + var recordThisBatch = batchInfo.numRecords + + println("job creation delay repoted in onBatchCompleted ==>" +batchInfo.batchTime+"==>"+batchInfo.batchJobSetCreationDelay ) + + if (!thread.isAlive) { + totalRecords += recordThisBatch + // val imap = getMap + // imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + + // ", batch Count so far," + batchCount + + // ", total Records so far," + totalRecords + + // ", record This Batch," + recordThisBatch + + // ", submission Time," + batchInfo.submissionTime + + // ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + + // ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + + // ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + + // ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) + // + // setMap(imap) + } + + if (totalRecords >= recordLimit) { + if (hasStarted && !thread.isAlive) { + //not receiving any data more, finish + endTime = System.currentTimeMillis() + endTime1 = batchInfo.processingEndTime.getOrElse(0L) + var warning = "" + val totalTime = (endTime - startTime).toDouble / 1000 + //This is weighted avg of every batch process time. The weight is records processed int the batch + val avgLatency = totalDelay.toDouble / totalRecords + if (avgLatency > batchSize.toDouble) + warning = "WARNING:SPARK CLUSTER IN UNSTABLE STATE. TRY REDUCE INPUT SPEED" + + val avgLatencyAdjust = avgLatency + batchSize.toDouble + val recordThroughput = totalRecords / totalTime + + val imap = getMap + + imap("Final Metric") = " Total Batch count," + batchCount + + ", startTime based on submissionTime," + startTime + + ", startTime based on System," + startTime1 + + ", endTime based on System," + endTime + + ", endTime based on processingEndTime," + endTime1 + + ", Total Records," + totalRecords + + // ", Total processing delay = " + totalDelay + " ms "+ + ", Total Consumed time in sec," + totalTime + + ", Avg latency/batchInterval in ms," + avgLatencyAdjust + + ", Avg records/sec," + recordThroughput + + ", WARNING," + warning + + imap.foreach { case (key, value) => println(key + "-->" + value) } + + thread.start + } + } else if (!hasStarted) { + if (batchInfo.numRecords > 0) { + startTime = batchCompleted.batchInfo.submissionTime + startTime1 = System.currentTimeMillis() + hasStarted = true + } + } + + if (hasStarted) { + // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms") + batchCompleted.batchInfo.processingDelay match { + case Some(value) => totalDelay += value * recordThisBatch + case None => //Nothing + } + batchCount = batchCount + 1 + } + } +} + object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 54d736ee5101..5ab90c096d03 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -114,8 +114,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => + val genStartTime=System.currentTimeMillis() val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) + jobOption.foreach(_.setGenDelay(System.currentTimeMillis()-genStartTime)) jobOption } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index db0bae9958d6..d75f84c62826 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -241,4 +241,5 @@ private[streaming] case class JavaOutputOperationInfo( description: String, startTime: Long, endTime: Long, + jobGenTime: Long, failureReason: String) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index b109b9f1cbea..3cbd5e1ff334 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -58,6 +58,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav outputOperationInfo.description: String, outputOperationInfo.startTime.getOrElse(-1), outputOperationInfo.endTime.getOrElse(-1), + outputOperationInfo.jobGenTime.getOrElse(-1), outputOperationInfo.failureReason.orNull ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 7050d7ef4524..2f0dcabb9d7e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -34,6 +34,7 @@ class Job(val time: Time, func: () => _) { private var _callSite: CallSite = null private var _startTime: Option[Long] = None private var _endTime: Option[Long] = None + private var _jobGenTime: Option[Long] = None def run() { _result = Try(func()) @@ -85,6 +86,14 @@ class Job(val time: Time, func: () => _) { _startTime = Some(startTime) } + def setGenDelay(jobGenTime: Long): Unit = { + _jobGenTime = Some(jobGenTime) + } + + def getGenDelay(): Option[Long] = { + _jobGenTime + } + def setEndTime(endTime: Long): Unit = { _endTime = Some(endTime) } @@ -96,7 +105,7 @@ class Job(val time: Time, func: () => _) { None } OutputOperationInfo( - time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason) + time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, _jobGenTime, failureReason) } override def toString: String = id diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala index 137e512a670d..8262370de9e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala @@ -39,6 +39,7 @@ case class OutputOperationInfo( description: String, startTime: Option[Long], endTime: Option[Long], + jobGenTime: Option[Long], failureReason: Option[String]) { /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index 1af60857bc77..dfc89d500b5c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -116,6 +116,7 @@ private[ui] case class OutputOperationUIData( description: String, startTime: Option[Long], endTime: Option[Long], + jobGenTime: Option[Long], failureReason: Option[String]) { def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s @@ -130,6 +131,7 @@ private[ui] object OutputOperationUIData { outputOperationInfo.description, outputOperationInfo.startTime, outputOperationInfo.endTime, + outputOperationInfo.jobGenTime, outputOperationInfo.failureReason ) } diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index b538918db749..8b9268f24c75 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -84,6 +84,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = None, endTime = None, + jobGenTime = None, failureReason = None), 1 -> OutputOperationInfo( batchTime = Time(1000L), @@ -92,6 +93,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation2", startTime = None, endTime = None, + jobGenTime = None, failureReason = None)) )) listenerWrapper.onBatchSubmitted(batchSubmitted) @@ -119,6 +121,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = None, + jobGenTime = None, failureReason = None), 1 -> OutputOperationInfo( batchTime = Time(1000L), @@ -127,6 +130,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation2", startTime = Some(1005L), endTime = None, + jobGenTime = None, failureReason = None)) )) listenerWrapper.onBatchStarted(batchStarted) @@ -154,6 +158,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = Some(1004L), + jobGenTime = None, failureReason = None), 1 -> OutputOperationInfo( batchTime = Time(1000L), @@ -162,6 +167,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation2", startTime = Some(1005L), endTime = Some(1010L), + jobGenTime = None, failureReason = None)) )) listenerWrapper.onBatchCompleted(batchCompleted) @@ -174,6 +180,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = None, + jobGenTime = None, failureReason = None )) listenerWrapper.onOutputOperationStarted(outputOperationStarted) @@ -187,6 +194,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = Some(1004L), + jobGenTime = None, failureReason = None )) listenerWrapper.onOutputOperationCompleted(outputOperationCompleted) @@ -243,6 +251,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assert(javaOutputOperationInfo.description === outputOperationInfo.description) assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1)) assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1)) + assert(javaOutputOperationInfo.jobGenTime === outputOperationInfo.jobGenTime.getOrElse(-1)) assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull) } } From 36975a978ad76ec033b09f23125ff61f5a67822b Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Tue, 12 Apr 2016 16:42:02 +0530 Subject: [PATCH 08/15] approach 2 for extension of listener --- .../kafka/DirectKafkaStreamSuite.scala | 166 ++++++------------ 1 file changed, 49 insertions(+), 117 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 289bd4c6e9f1..9fdbd893ceaa 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator -import org.apache.spark.streaming.{Seconds, Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.scalatest.concurrent.Eventually @@ -115,8 +115,8 @@ class DirectKafkaStreamSuite logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } val collected = rdd.mapPartitionsWithIndex { (i, iter) => - // For each partition, get size of the range in the partition, - // and the number of items in the partition + // For each partition, get size of the range in the partition, + // and the number of items in the partition val off = offsetRanges(i) val all = iter.toSeq val partSize = all.size @@ -411,7 +411,7 @@ class DirectKafkaStreamSuite .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, kafkaParams, m, messageHandler) { + ssc, kafkaParams, m, messageHandler) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) } @@ -436,8 +436,8 @@ class DirectKafkaStreamSuite Seq(100, 50, 20).foreach { rate => collectedData.clear() // Empty this buffer on each pass. estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. @@ -451,7 +451,7 @@ class DirectKafkaStreamSuite /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( - kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { + kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { kafkaStream.generatedRDDs.mapValues { rdd => rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges }.toSeq.sortBy { _._1 } @@ -477,75 +477,26 @@ class DirectKafkaStreamSuite } } -object DirectKafkaWordCountLocal { - import org.apache.spark.streaming._ +object DirectKafkaWordCount { - case class Tick(symbol: String, price: Int, ts: Long) + import org.apache.spark.streaming._ def main(args: Array[String]) { // Create context with 2 second batch interval val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("/tmp/checkpoint") - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val lines = ssc.socketTextStream("localhost", 8888) - - val words = lines.flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - - val wordCountNew = wordCounts.filter(_._1.startsWith("sac")).reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - wordCountNew.print() - - ssc.start() - ssc.awaitTermination() - } -} -object DirectKafkaWordCountKafka { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - - val ssc = new StreamingContext(sparkConf, Seconds(2)) - //ssc.checkpoint(checkPointPath) - + val ssc = new StreamingContext(sparkConf, Seconds(6)) val listener = new LatencyListener(ssc) ssc.addStreamingListener(listener) - val kafkaBrokers = "localhost" - val kafkaPort ="9092" - val topic="test" - - val topicsSet = Set(topic) - val brokerListString = new StringBuilder(); + val lines = ssc.socketTextStream("localhost", 9998) - brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) - - - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) - System.err.println( - "Trying to connect to Kafka at " + brokerListString.toString()) - val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet) - ssc.checkpoint("/tmp/checkPoint/") - // Create context with 2 second batch interval - - //val lines = ssc.socketTextStream("localhost", 9998) - - val words = messages.map(x => x._2).flatMap(_.split(" ")) + val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) - pairs.print() - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() @@ -553,8 +504,8 @@ object DirectKafkaWordCountKafka { ssc.awaitTermination() } } - - +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.scheduler._ class StopContextThread(ssc: StreamingContext) extends Runnable { def run { @@ -586,77 +537,58 @@ class LatencyListener(ssc: StreamingContext) extends StreamingListener { this.metricMap = metricMap } - /** Called when processing of a job of a batch has started. */ - override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { - println("job creation delay repoted in onOutputOperationStarted ==>"+outputOperationStarted.outputOperationInfo.batchTime+"==>"+outputOperationStarted.outputOperationInfo.id+"==>"+outputOperationStarted.outputOperationInfo.jobGenTime) - } - - val batchSize = Seconds(2).toString - val recordLimitPerThread = 1000 - val loaderThreads = 10 - - val recordLimit = loaderThreads * recordLimitPerThread - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { val batchInfo = batchCompleted.batchInfo + println("job generate delay ="+batchCompleted.batchInfo.batchJobSetCreationDelay) + val prevCount = totalRecords var recordThisBatch = batchInfo.numRecords - - println("job creation delay repoted in onBatchCompleted ==>" +batchInfo.batchTime+"==>"+batchInfo.batchJobSetCreationDelay ) - if (!thread.isAlive) { totalRecords += recordThisBatch - // val imap = getMap - // imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + - // ", batch Count so far," + batchCount + - // ", total Records so far," + totalRecords + - // ", record This Batch," + recordThisBatch + - // ", submission Time," + batchInfo.submissionTime + - // ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + - // ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + - // ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + - // ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) - // - // setMap(imap) - } - - if (totalRecords >= recordLimit) { + val imap = getMap + imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + + ", batch Count so far," + batchCount + + ", total Records so far," + totalRecords + + ", record This Batch," + recordThisBatch + + ", submission Time," + batchInfo.submissionTime + + ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + + ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + + ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + + ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) + + setMap(imap) + } + + if (totalRecords >= 100) { if (hasStarted && !thread.isAlive) { //not receiving any data more, finish endTime = System.currentTimeMillis() endTime1 = batchInfo.processingEndTime.getOrElse(0L) - var warning = "" + var warning="" val totalTime = (endTime - startTime).toDouble / 1000 //This is weighted avg of every batch process time. The weight is records processed int the batch - val avgLatency = totalDelay.toDouble / totalRecords - if (avgLatency > batchSize.toDouble) - warning = "WARNING:SPARK CLUSTER IN UNSTABLE STATE. TRY REDUCE INPUT SPEED" - - val avgLatencyAdjust = avgLatency + batchSize.toDouble val recordThroughput = totalRecords / totalTime val imap = getMap - imap("Final Metric") = " Total Batch count," + batchCount + - ", startTime based on submissionTime," + startTime + - ", startTime based on System," + startTime1 + - ", endTime based on System," + endTime + - ", endTime based on processingEndTime," + endTime1 + - ", Total Records," + totalRecords + + imap("Final Metric") = " Total Batch count," + batchCount+ + ", startTime based on submissionTime,"+startTime + + ", startTime based on System,"+startTime1 + + ", endTime based on System,"+endTime + + ", endTime based on processingEndTime,"+endTime1 + + ", Total Records,"+totalRecords+ // ", Total processing delay = " + totalDelay + " ms "+ ", Total Consumed time in sec," + totalTime + - ", Avg latency/batchInterval in ms," + avgLatencyAdjust + - ", Avg records/sec," + recordThroughput + - ", WARNING," + warning + ", Avg records/sec," + recordThroughput - imap.foreach { case (key, value) => println(key + "-->" + value) } + imap.foreach {case (key, value) => println(key + "-->" + value)} thread.start } } else if (!hasStarted) { - if (batchInfo.numRecords > 0) { + if (batchInfo.numRecords>0) { startTime = batchCompleted.batchInfo.submissionTime - startTime1 = System.currentTimeMillis() + startTime1 = System.currentTimeMillis() hasStarted = true } } @@ -670,8 +602,8 @@ class LatencyListener(ssc: StreamingContext) extends StreamingListener { batchCount = batchCount + 1 } } -} +} object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L @@ -703,14 +635,14 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) } def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(rate) + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(rate) } private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long) extends RateController(id, estimator) { override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate -} +} \ No newline at end of file From 96e33b8330ce10105e713bc13f69bbcddad57279 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:26:03 +0530 Subject: [PATCH 09/15] fixed the example --- .../kafka/DirectKafkaStreamSuite.scala | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 9fdbd893ceaa..86b792c1ad98 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -483,20 +483,39 @@ object DirectKafkaWordCount { import org.apache.spark.streaming._ def main(args: Array[String]) { - - // Create context with 2 second batch interval val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(6)) + + val ssc = new StreamingContext(sparkConf, Seconds(2)) + //ssc.checkpoint(checkPointPath) + val listener = new LatencyListener(ssc) ssc.addStreamingListener(listener) + val kafkaBrokers = "localhost" + val kafkaPort ="9092" + val topic="test" + + val topicsSet = Set(topic) + + val brokerListString = new StringBuilder(); + + brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) + + + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) + System.err.println( + "Trying to connect to Kafka at " + brokerListString.toString()) + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + ssc.checkpoint("/tmp/checkPoint/") + // Create context with 2 second batch interval - val lines = ssc.socketTextStream("localhost", 9998) + //val lines = ssc.socketTextStream("localhost", 9998) - val words = lines.flatMap(_.split(" ")) + val words = messages.map(x => x._2).flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) - val wordCounts = pairs.reduceByKey(_ + _) + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) wordCounts.print() @@ -559,7 +578,7 @@ class LatencyListener(ssc: StreamingContext) extends StreamingListener { setMap(imap) } - if (totalRecords >= 100) { + if (totalRecords >= 10000) { if (hasStarted && !thread.isAlive) { //not receiving any data more, finish endTime = System.currentTimeMillis() From 9f4875102550ff760ab66d38f4f76d3e1583d869 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:43:11 +0530 Subject: [PATCH 10/15] fixed the eaxmple --- .../kafka/DirectKafkaStreamSuite.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 86b792c1ad98..773a727b667b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -478,7 +478,36 @@ class DirectKafkaStreamSuite } -object DirectKafkaWordCount { +object DirectKafkaWordCountLocal { + + import org.apache.spark.streaming._ + + case class Tick(symbol: String, price: Int, ts: Long) + + def main(args: Array[String]) { + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("/tmp/checkpoint") + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + val lines = ssc.socketTextStream("localhost", 9998) + + val words = lines.flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +object DirectKafkaWordCountKafka { import org.apache.spark.streaming._ From 06164da2d41ffee47b4406caea1b4c8695e4b86b Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:48:29 +0530 Subject: [PATCH 11/15] fixed the eaxmple --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 773a727b667b..33d6071ed92b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -482,8 +482,6 @@ object DirectKafkaWordCountLocal { import org.apache.spark.streaming._ - case class Tick(symbol: String, price: Int, ts: Long) - def main(args: Array[String]) { // Create context with 2 second batch interval @@ -525,10 +523,7 @@ object DirectKafkaWordCountKafka { val topicsSet = Set(topic) - val brokerListString = new StringBuilder(); - - brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) - + val brokerListString = kafkaBrokers+":"+kafkaPort val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) System.err.println( @@ -536,9 +531,6 @@ object DirectKafkaWordCountKafka { val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) ssc.checkpoint("/tmp/checkPoint/") - // Create context with 2 second batch interval - - //val lines = ssc.socketTextStream("localhost", 9998) val words = messages.map(x => x._2).flatMap(_.split(" ")) From 605e15aa0f2091b2b21ff259ebd574b6a02c994c Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 14:48:41 +0530 Subject: [PATCH 12/15] fixed the example --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 33d6071ed92b..0924e0717957 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -595,7 +595,6 @@ class LatencyListener(ssc: StreamingContext) extends StreamingListener { ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) - setMap(imap) } From 7f906d4de20d37960b994d701dc27ff5290eec47 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 13 Apr 2016 17:53:12 +0530 Subject: [PATCH 13/15] removed test code --- .../kafka/DirectKafkaStreamSuite.scala | 208 ++---------------- 1 file changed, 22 insertions(+), 186 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 0924e0717957..f14ff6705fd9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -19,26 +19,28 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.Arrays -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.scalatest.concurrent.Eventually -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps class DirectKafkaStreamSuite extends SparkFunSuite @@ -115,8 +117,8 @@ class DirectKafkaStreamSuite logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } val collected = rdd.mapPartitionsWithIndex { (i, iter) => - // For each partition, get size of the range in the partition, - // and the number of items in the partition + // For each partition, get size of the range in the partition, + // and the number of items in the partition val off = offsetRanges(i) val all = iter.toSeq val partSize = all.size @@ -411,7 +413,7 @@ class DirectKafkaStreamSuite .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset)) new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)]( - ssc, kafkaParams, m, messageHandler) { + ssc, kafkaParams, m, messageHandler) { override protected[streaming] val rateController = Some(new DirectKafkaRateController(id, estimator)) } @@ -436,8 +438,8 @@ class DirectKafkaStreamSuite Seq(100, 50, 20).foreach { rate => collectedData.clear() // Empty this buffer on each pass. estimator.updateRate(rate) // Set a new rate. - // Expect blocks of data equal to "rate", scaled by the interval length in secs. - val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) + // Expect blocks of data equal to "rate", scaled by the interval length in secs. + val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001) eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) { // Assert that rate estimator values are used to determine maxMessagesPerPartition. // Funky "-" in message makes the complete assertion message read better. @@ -451,7 +453,7 @@ class DirectKafkaStreamSuite /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( - kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { + kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = { kafkaStream.generatedRDDs.mapValues { rdd => rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges }.toSeq.sortBy { _._1 } @@ -477,172 +479,6 @@ class DirectKafkaStreamSuite } } - -object DirectKafkaWordCountLocal { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - - // Create context with 2 second batch interval - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("/tmp/checkpoint") - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val lines = ssc.socketTextStream("localhost", 9998) - - val words = lines.flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - -object DirectKafkaWordCountKafka { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - - val ssc = new StreamingContext(sparkConf, Seconds(2)) - //ssc.checkpoint(checkPointPath) - - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val kafkaBrokers = "localhost" - val kafkaPort ="9092" - val topic="test" - - val topicsSet = Set(topic) - - val brokerListString = kafkaBrokers+":"+kafkaPort - - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) - System.err.println( - "Trying to connect to Kafka at " + brokerListString.toString()) - val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet) - ssc.checkpoint("/tmp/checkPoint/") - - val words = messages.map(x => x._2).flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.scheduler._ - -class StopContextThread(ssc: StreamingContext) extends Runnable { - def run { - ssc.stop(true, true) - } -} - - -class LatencyListener(ssc: StreamingContext) extends StreamingListener { - - var metricMap: scala.collection.mutable.Map[String, Object] = _ - var startTime = 0L - var startTime1 = 0L - var endTime = 0L - var endTime1 = 0L - var totalDelay = 0L - var hasStarted = false - var batchCount = 0 - var totalRecords = 0L - val thread: Thread = new Thread(new StopContextThread(ssc)) - - - def getMap(): scala.collection.mutable.Map[String, Object] = synchronized { - if (metricMap == null) metricMap = scala.collection.mutable.Map() - metricMap - } - - def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized { - this.metricMap = metricMap - } - - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - val batchInfo = batchCompleted.batchInfo - println("job generate delay ="+batchCompleted.batchInfo.batchJobSetCreationDelay) - - val prevCount = totalRecords - var recordThisBatch = batchInfo.numRecords - if (!thread.isAlive) { - totalRecords += recordThisBatch - val imap = getMap - imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + - ", batch Count so far," + batchCount + - ", total Records so far," + totalRecords + - ", record This Batch," + recordThisBatch + - ", submission Time," + batchInfo.submissionTime + - ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + - ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + - ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + - ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) - setMap(imap) - } - - if (totalRecords >= 10000) { - if (hasStarted && !thread.isAlive) { - //not receiving any data more, finish - endTime = System.currentTimeMillis() - endTime1 = batchInfo.processingEndTime.getOrElse(0L) - var warning="" - val totalTime = (endTime - startTime).toDouble / 1000 - //This is weighted avg of every batch process time. The weight is records processed int the batch - val recordThroughput = totalRecords / totalTime - - val imap = getMap - - imap("Final Metric") = " Total Batch count," + batchCount+ - ", startTime based on submissionTime,"+startTime + - ", startTime based on System,"+startTime1 + - ", endTime based on System,"+endTime + - ", endTime based on processingEndTime,"+endTime1 + - ", Total Records,"+totalRecords+ - // ", Total processing delay = " + totalDelay + " ms "+ - ", Total Consumed time in sec," + totalTime + - ", Avg records/sec," + recordThroughput - - imap.foreach {case (key, value) => println(key + "-->" + value)} - - thread.start - } - } else if (!hasStarted) { - if (batchInfo.numRecords>0) { - startTime = batchCompleted.batchInfo.submissionTime - startTime1 = System.currentTimeMillis() - hasStarted = true - } - } - - if (hasStarted) { - // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms") - batchCompleted.batchInfo.processingDelay match { - case Some(value) => totalDelay += value * recordThisBatch - case None => //Nothing - } - batchCount = batchCount + 1 - } - } - -} object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L @@ -674,14 +510,14 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long) } def compute( - time: Long, - elements: Long, - processingDelay: Long, - schedulingDelay: Long): Option[Double] = Some(rate) + time: Long, + elements: Long, + processingDelay: Long, + schedulingDelay: Long): Option[Double] = Some(rate) } private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long) extends RateController(id, estimator) { override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate -} \ No newline at end of file +} From 8258ef87a3e6aba0d7b49b267be7c9f8e12079d9 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Thu, 21 Apr 2016 01:56:23 +0530 Subject: [PATCH 14/15] made changes to capture job level jobCreateDelay metric --- .../kafka/DirectKafkaStreamSuite.scala | 215 +++++++++++++++++- 1 file changed, 204 insertions(+), 11 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705fd9..289bd4c6e9f1 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -19,28 +19,26 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.Arrays -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps +import java.util.concurrent.atomic.AtomicLong import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.scalatest.concurrent.Eventually - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.streaming.{Seconds, Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps class DirectKafkaStreamSuite extends SparkFunSuite @@ -479,6 +477,201 @@ class DirectKafkaStreamSuite } } +object DirectKafkaWordCountLocal { + + import org.apache.spark.streaming._ + + case class Tick(symbol: String, price: Int, ts: Long) + + def main(args: Array[String]) { + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(2)) + ssc.checkpoint("/tmp/checkpoint") + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + val lines = ssc.socketTextStream("localhost", 8888) + + val words = lines.flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + + + val wordCountNew = wordCounts.filter(_._1.startsWith("sac")).reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + wordCountNew.print() + + ssc.start() + ssc.awaitTermination() + } +} +object DirectKafkaWordCountKafka { + + import org.apache.spark.streaming._ + + def main(args: Array[String]) { + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + + val ssc = new StreamingContext(sparkConf, Seconds(2)) + //ssc.checkpoint(checkPointPath) + + val listener = new LatencyListener(ssc) + ssc.addStreamingListener(listener) + val kafkaBrokers = "localhost" + val kafkaPort ="9092" + val topic="test" + + val topicsSet = Set(topic) + + val brokerListString = new StringBuilder(); + + brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) + + + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) + System.err.println( + "Trying to connect to Kafka at " + brokerListString.toString()) + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + ssc.checkpoint("/tmp/checkPoint/") + // Create context with 2 second batch interval + + //val lines = ssc.socketTextStream("localhost", 9998) + + val words = messages.map(x => x._2).flatMap(_.split(" ")) + + val pairs = words.map(word => (word, 1)) + pairs.print() + + val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) + + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + + + +class StopContextThread(ssc: StreamingContext) extends Runnable { + def run { + ssc.stop(true, true) + } +} + + +class LatencyListener(ssc: StreamingContext) extends StreamingListener { + + var metricMap: scala.collection.mutable.Map[String, Object] = _ + var startTime = 0L + var startTime1 = 0L + var endTime = 0L + var endTime1 = 0L + var totalDelay = 0L + var hasStarted = false + var batchCount = 0 + var totalRecords = 0L + val thread: Thread = new Thread(new StopContextThread(ssc)) + + + def getMap(): scala.collection.mutable.Map[String, Object] = synchronized { + if (metricMap == null) metricMap = scala.collection.mutable.Map() + metricMap + } + + def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized { + this.metricMap = metricMap + } + + /** Called when processing of a job of a batch has started. */ + override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { + println("job creation delay repoted in onOutputOperationStarted ==>"+outputOperationStarted.outputOperationInfo.batchTime+"==>"+outputOperationStarted.outputOperationInfo.id+"==>"+outputOperationStarted.outputOperationInfo.jobGenTime) + } + + val batchSize = Seconds(2).toString + val recordLimitPerThread = 1000 + val loaderThreads = 10 + + val recordLimit = loaderThreads * recordLimitPerThread + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { + val batchInfo = batchCompleted.batchInfo + val prevCount = totalRecords + var recordThisBatch = batchInfo.numRecords + + println("job creation delay repoted in onBatchCompleted ==>" +batchInfo.batchTime+"==>"+batchInfo.batchJobSetCreationDelay ) + + if (!thread.isAlive) { + totalRecords += recordThisBatch + // val imap = getMap + // imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + + // ", batch Count so far," + batchCount + + // ", total Records so far," + totalRecords + + // ", record This Batch," + recordThisBatch + + // ", submission Time," + batchInfo.submissionTime + + // ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + + // ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + + // ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + + // ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) + // + // setMap(imap) + } + + if (totalRecords >= recordLimit) { + if (hasStarted && !thread.isAlive) { + //not receiving any data more, finish + endTime = System.currentTimeMillis() + endTime1 = batchInfo.processingEndTime.getOrElse(0L) + var warning = "" + val totalTime = (endTime - startTime).toDouble / 1000 + //This is weighted avg of every batch process time. The weight is records processed int the batch + val avgLatency = totalDelay.toDouble / totalRecords + if (avgLatency > batchSize.toDouble) + warning = "WARNING:SPARK CLUSTER IN UNSTABLE STATE. TRY REDUCE INPUT SPEED" + + val avgLatencyAdjust = avgLatency + batchSize.toDouble + val recordThroughput = totalRecords / totalTime + + val imap = getMap + + imap("Final Metric") = " Total Batch count," + batchCount + + ", startTime based on submissionTime," + startTime + + ", startTime based on System," + startTime1 + + ", endTime based on System," + endTime + + ", endTime based on processingEndTime," + endTime1 + + ", Total Records," + totalRecords + + // ", Total processing delay = " + totalDelay + " ms "+ + ", Total Consumed time in sec," + totalTime + + ", Avg latency/batchInterval in ms," + avgLatencyAdjust + + ", Avg records/sec," + recordThroughput + + ", WARNING," + warning + + imap.foreach { case (key, value) => println(key + "-->" + value) } + + thread.start + } + } else if (!hasStarted) { + if (batchInfo.numRecords > 0) { + startTime = batchCompleted.batchInfo.submissionTime + startTime1 = System.currentTimeMillis() + hasStarted = true + } + } + + if (hasStarted) { + // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms") + batchCompleted.batchInfo.processingDelay match { + case Some(value) => totalDelay += value * recordThisBatch + case None => //Nothing + } + batchCount = batchCount + 1 + } + } +} + object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L From bb4be2218c375b093afe7ede4bed1bde5ea040d6 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Tue, 25 Oct 2016 08:46:38 +0530 Subject: [PATCH 15/15] code clean up --- .../kafka/DirectKafkaStreamSuite.scala | 215 +----------------- .../streaming/scheduler/JobGenerator.scala | 7 +- 2 files changed, 15 insertions(+), 207 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 289bd4c6e9f1..f14ff6705fd9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -19,26 +19,28 @@ package org.apache.spark.streaming.kafka import java.io.File import java.util.Arrays -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.ConcurrentLinkedQueue + +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ +import scala.language.postfixOps import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.Eventually + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator -import org.apache.spark.streaming.{Seconds, Milliseconds, StreamingContext, Time} import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.scalatest.concurrent.Eventually -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps class DirectKafkaStreamSuite extends SparkFunSuite @@ -477,201 +479,6 @@ class DirectKafkaStreamSuite } } -object DirectKafkaWordCountLocal { - - import org.apache.spark.streaming._ - - case class Tick(symbol: String, price: Int, ts: Long) - - def main(args: Array[String]) { - - // Create context with 2 second batch interval - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(2)) - ssc.checkpoint("/tmp/checkpoint") - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val lines = ssc.socketTextStream("localhost", 8888) - - val words = lines.flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - - val wordCountNew = wordCounts.filter(_._1.startsWith("sac")).reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - wordCountNew.print() - - ssc.start() - ssc.awaitTermination() - } -} -object DirectKafkaWordCountKafka { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - - val ssc = new StreamingContext(sparkConf, Seconds(2)) - //ssc.checkpoint(checkPointPath) - - val listener = new LatencyListener(ssc) - ssc.addStreamingListener(listener) - val kafkaBrokers = "localhost" - val kafkaPort ="9092" - val topic="test" - - val topicsSet = Set(topic) - - val brokerListString = new StringBuilder(); - - brokerListString.append(kafkaBrokers).append(":").append(kafkaPort) - - - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerListString.toString()) - System.err.println( - "Trying to connect to Kafka at " + brokerListString.toString()) - val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet) - ssc.checkpoint("/tmp/checkPoint/") - // Create context with 2 second batch interval - - //val lines = ssc.socketTextStream("localhost", 9998) - - val words = messages.map(x => x._2).flatMap(_.split(" ")) - - val pairs = words.map(word => (word, 1)) - pairs.print() - - val wordCounts = pairs.reduceByKeyAndWindow(_+_, _-_, Seconds(60),Seconds(10)) - - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - - - -class StopContextThread(ssc: StreamingContext) extends Runnable { - def run { - ssc.stop(true, true) - } -} - - -class LatencyListener(ssc: StreamingContext) extends StreamingListener { - - var metricMap: scala.collection.mutable.Map[String, Object] = _ - var startTime = 0L - var startTime1 = 0L - var endTime = 0L - var endTime1 = 0L - var totalDelay = 0L - var hasStarted = false - var batchCount = 0 - var totalRecords = 0L - val thread: Thread = new Thread(new StopContextThread(ssc)) - - - def getMap(): scala.collection.mutable.Map[String, Object] = synchronized { - if (metricMap == null) metricMap = scala.collection.mutable.Map() - metricMap - } - - def setMap(metricMap: scala.collection.mutable.Map[String, Object]) = synchronized { - this.metricMap = metricMap - } - - /** Called when processing of a job of a batch has started. */ - override def onOutputOperationStarted(outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { - println("job creation delay repoted in onOutputOperationStarted ==>"+outputOperationStarted.outputOperationInfo.batchTime+"==>"+outputOperationStarted.outputOperationInfo.id+"==>"+outputOperationStarted.outputOperationInfo.jobGenTime) - } - - val batchSize = Seconds(2).toString - val recordLimitPerThread = 1000 - val loaderThreads = 10 - - val recordLimit = loaderThreads * recordLimitPerThread - - override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { - val batchInfo = batchCompleted.batchInfo - val prevCount = totalRecords - var recordThisBatch = batchInfo.numRecords - - println("job creation delay repoted in onBatchCompleted ==>" +batchInfo.batchTime+"==>"+batchInfo.batchJobSetCreationDelay ) - - if (!thread.isAlive) { - totalRecords += recordThisBatch - // val imap = getMap - // imap(batchInfo.batchTime.toString()) = "batchTime," + batchInfo.batchTime + - // ", batch Count so far," + batchCount + - // ", total Records so far," + totalRecords + - // ", record This Batch," + recordThisBatch + - // ", submission Time," + batchInfo.submissionTime + - // ", processing Start Time," + batchInfo.processingStartTime.getOrElse(0L) + - // ", processing End Time," + batchInfo.processingEndTime.getOrElse(0L) + - // ", scheduling Delay," + batchInfo.schedulingDelay.getOrElse(0L) + - // ", processing Delay," + batchInfo.processingDelay.getOrElse(0L) - // - // setMap(imap) - } - - if (totalRecords >= recordLimit) { - if (hasStarted && !thread.isAlive) { - //not receiving any data more, finish - endTime = System.currentTimeMillis() - endTime1 = batchInfo.processingEndTime.getOrElse(0L) - var warning = "" - val totalTime = (endTime - startTime).toDouble / 1000 - //This is weighted avg of every batch process time. The weight is records processed int the batch - val avgLatency = totalDelay.toDouble / totalRecords - if (avgLatency > batchSize.toDouble) - warning = "WARNING:SPARK CLUSTER IN UNSTABLE STATE. TRY REDUCE INPUT SPEED" - - val avgLatencyAdjust = avgLatency + batchSize.toDouble - val recordThroughput = totalRecords / totalTime - - val imap = getMap - - imap("Final Metric") = " Total Batch count," + batchCount + - ", startTime based on submissionTime," + startTime + - ", startTime based on System," + startTime1 + - ", endTime based on System," + endTime + - ", endTime based on processingEndTime," + endTime1 + - ", Total Records," + totalRecords + - // ", Total processing delay = " + totalDelay + " ms "+ - ", Total Consumed time in sec," + totalTime + - ", Avg latency/batchInterval in ms," + avgLatencyAdjust + - ", Avg records/sec," + recordThroughput + - ", WARNING," + warning - - imap.foreach { case (key, value) => println(key + "-->" + value) } - - thread.start - } - } else if (!hasStarted) { - if (batchInfo.numRecords > 0) { - startTime = batchCompleted.batchInfo.submissionTime - startTime1 = System.currentTimeMillis() - hasStarted = true - } - } - - if (hasStarted) { - // println("This delay:"+batchCompleted.batchInfo.processingDelay+"ms") - batchCompleted.batchInfo.processingDelay match { - case Some(value) => totalDelay += value * recordThisBatch - case None => //Nothing - } - batchCount = batchCount + 1 - } - } -} - object DirectKafkaStreamSuite { val collectedData = new ConcurrentLinkedQueue[String]() @volatile var total = -1L diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 5dfba6795f70..f87b00f50c0e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,14 +17,15 @@ package org.apache.spark.streaming.scheduler +import scala.util.{Failure, Success, Try} + +import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} +import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils} -import scala.util.{Failure, Success, Try} - /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent