From 493f9783fd34329f3ca7f596687d0daf7b6dbf27 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 8 Apr 2015 14:49:38 +0800 Subject: [PATCH 1/6] Send StreamingListenerBatchSubmitted when JobSet is submitted; fix StreamingListenerBatchStarted.batchInfo.processingStartTime; fix a typo --- .../spark/streaming/scheduler/JobScheduler.scala | 7 ++++++- .../ui/StreamingJobProgressListener.scala | 16 ++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index d6a93acbe711..66e206bab6fa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -105,6 +105,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { + listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) @@ -135,9 +136,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private def handleJobStart(job: Job) { val jobSet = jobSets.get(job.time) if (!jobSet.hasStarted) { + jobSet.handleJobStart(job) + // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the + // correct "jobSet.processingStartTime". listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) + } else { + jobSet.handleJobStart(job) } - jobSet.handleJobStart(job) logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index e4bd067cacb7..84f80e638f63 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -33,7 +33,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] - private val completedaBatchInfos = new Queue[BatchInfo] + private val completedBatchInfos = new Queue[BatchInfo] private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) private var totalCompletedBatches = 0L private var totalReceivedRecords = 0L @@ -62,7 +62,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { synchronized { - runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo + waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo } } @@ -79,8 +79,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) synchronized { waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime) runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) - completedaBatchInfos.enqueue(batchCompleted.batchInfo) - if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + completedBatchInfos.enqueue(batchCompleted.batchInfo) + if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue() totalCompletedBatches += 1L batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) => @@ -118,7 +118,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def retainedCompletedBatches: Seq[BatchInfo] = synchronized { - completedaBatchInfos.toSeq + completedBatchInfos.toSeq } def processingDelayDistribution: Option[Distribution] = synchronized { @@ -165,7 +165,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) } def lastCompletedBatch: Option[BatchInfo] = { - completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption + completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption } def lastReceivedBatch: Option[BatchInfo] = { @@ -174,10 +174,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) private def retainedBatches: Seq[BatchInfo] = synchronized { (waitingBatchInfos.values.toSeq ++ - runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering) + runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering) } private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { - Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) + Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) } } From 74aed99cfc3ce8bfd0c6441bf9a94a792d7a9b3b Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 9 Apr 2015 16:17:09 +0800 Subject: [PATCH 2/6] Refactor as per TD's suggestion --- .../apache/spark/streaming/scheduler/JobScheduler.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 66e206bab6fa..95f1857b4c37 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -135,13 +135,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private def handleJobStart(job: Job) { val jobSet = jobSets.get(job.time) - if (!jobSet.hasStarted) { - jobSet.handleJobStart(job) + val isFirstJobOfJobSet = !jobSet.hasStarted + jobSet.handleJobStart(job) + if (isFirstJobOfJobSet) { // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the // correct "jobSet.processingStartTime". listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) - } else { - jobSet.handleJobStart(job) } logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) } From fc3a2a11e0a38947a8ecdf3c3751ae4e47d1114c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 9 Apr 2015 16:36:33 +0800 Subject: [PATCH 3/6] Add unit tests for SPARK-6766 --- .../streaming/StreamingListenerSuite.scala | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index f52562b0a0f7..ff8f30faa6e3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -88,6 +88,58 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } + test("SPARK-6766: batch info should be submitted") { + val ssc = setupStreams(input, operation) + val collector = new StreamingListener { + val batchInfos = new ArrayBuffer[BatchInfo] + + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { + batchInfos += batchSubmitted.batchInfo + } + } + ssc.addStreamingListener(collector) + runStreams(ssc, input.size, input.size) + ssc.awaitTerminationOrTimeout(5000) should be (true) + + val batchInfos = collector.batchInfos + batchInfos should have size 4 + + batchInfos.foreach(info => { + info.schedulingDelay should be (None) + info.processingDelay should be (None) + info.totalDelay should be (None) + }) + + isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) + } + + test("SPARK-6766: processingStartTime of batch info should not be None when starting") { + val ssc = setupStreams(input, operation) + val collector = new StreamingListener { + val batchInfos = new ArrayBuffer[BatchInfo] + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { + batchInfos += batchStarted.batchInfo + } + } + ssc.addStreamingListener(collector) + runStreams(ssc, input.size, input.size) + ssc.awaitTerminationOrTimeout(5000) should be (true) + + val batchInfos = collector.batchInfos + batchInfos should have size 4 + + batchInfos.foreach(info => { + info.schedulingDelay should not be None + info.schedulingDelay.get should be >= 0L + info.processingDelay should be (None) + info.totalDelay should be (None) + }) + + isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) + isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true) + } + /** Check if a sequence of numbers is in increasing order */ def isInIncreasingOrder(seq: Seq[Long]): Boolean = { for(i <- 1 until seq.size) { From 79b4fedad774c5617425349540c9ae83c8996df9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 9 Apr 2015 17:22:27 +0800 Subject: [PATCH 4/6] Add StreamingJobProgressListenerSuite to test StreamingJobProgressListener --- .../StreamingJobProgressListenerSuite.scala | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala new file mode 100644 index 000000000000..175f937444d4 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.scalatest.Matchers + +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.{Time, Milliseconds, TestSuiteBase} + +class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { + + val input = (1 to 4).map(Seq(_)).toSeq + val operation = (d: DStream[Int]) => d.map(x => x) + + override def batchDuration = Milliseconds(100) + + test("onBatchSubmitted") { + val ssc = setupStreams(input, operation) + val listener = new StreamingJobProgressListener(ssc) + + val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None) + listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) + + listener.waitingBatches should be(List(batchInfoSubmitted)) + } + + test("onBatchStarted") { + val ssc = setupStreams(input, operation) + val listener = new StreamingJobProgressListener(ssc) + + val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None) + listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) + + val receivedBlockInfo = Map( + 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), + 1 -> Array(ReceivedBlockInfo(1, 300, null)) + ) + val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) + listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) + + listener.runningBatches should be(List(batchInfoStarted)) + listener.waitingBatches should be(Nil) + listener.numTotalReceivedRecords should be(600) + } + + test("onBatchCompleted") { + val ssc = setupStreams(input, operation) + val listener = new StreamingJobProgressListener(ssc) + + val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None) + listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) + + val receivedBlockInfo = Map( + 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), + 1 -> Array(ReceivedBlockInfo(1, 300, null)) + ) + val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) + listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) + + val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) + listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) + + listener.runningBatches should be (Nil) + listener.waitingBatches should be (Nil) + listener.lastCompletedBatch should be (Some(batchInfoCompleted)) + listener.retainedCompletedBatches should be (List(batchInfoCompleted)) + listener.numTotalCompletedBatches should be (1) + listener.numTotalProcessedRecords should be (600) + listener.numTotalReceivedRecords should be (600) + } + + test("retain completed batch") { + val ssc = setupStreams(input, operation) + val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) + val listener = new StreamingJobProgressListener(ssc) + + val receivedBlockInfo = Map( + 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), + 1 -> Array(ReceivedBlockInfo(1, 300, null)) + ) + val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) + + for(_ <- 0 until (limit + 10)) { + listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) + } + + listener.retainedCompletedBatches.size should be (limit) + listener.numTotalCompletedBatches should be(limit + 10) + } + + test("onReceiverStarted") { + val ssc = setupStreams(input, operation) + val listener = new StreamingJobProgressListener(ssc) + + val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost") + listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfo)) + + listener.receiverInfo(0) should be (Some(receiverInfo)) + listener.receiverInfo(1) should be (None) + } + + test("onReceiverError") { + val ssc = setupStreams(input, operation) + val listener = new StreamingJobProgressListener(ssc) + + val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost") + listener.onReceiverError(StreamingListenerReceiverError(receiverInfo)) + + listener.receiverInfo(0) should be (Some(receiverInfo)) + listener.receiverInfo(1) should be (None) + } + + test("onReceiverStopped") { + val ssc = setupStreams(input, operation) + val listener = new StreamingJobProgressListener(ssc) + + val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost") + listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfo)) + + listener.receiverInfo(0) should be (Some(receiverInfo)) + listener.receiverInfo(1) should be (None) + } + +} From ca0955b492279414332d38a2b39107ad5e05598a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 10 Apr 2015 07:45:01 +0800 Subject: [PATCH 5/6] Combine unit tests --- .../streaming/StreamingListenerSuite.scala | 91 ++++++++--------- .../StreamingJobProgressListenerSuite.scala | 98 ++++++------------- 2 files changed, 71 insertions(+), 118 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index ff8f30faa6e3..cb594b518ae0 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -46,6 +46,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val collector = new BatchInfoCollector ssc.addStreamingListener(collector) runStreams(ssc, input.size, input.size) + ssc.awaitTerminationOrTimeout(5000) should be (true) + val batchInfos = collector.batchInfos batchInfos should have size 4 @@ -61,6 +63,32 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true) isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true) + + // SPARK-6766: batch info should be submitted + val batchInfosSubmitted = collector.batchInfosSubmitted + batchInfosSubmitted should have size 4 + + batchInfosSubmitted.foreach(info => { + info.schedulingDelay should be (None) + info.processingDelay should be (None) + info.totalDelay should be (None) + }) + + isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true) + + // SPARK-6766: processingStartTime of batch info should not be None when starting + val batchInfosStarted = collector.batchInfosStarted + batchInfosStarted should have size 4 + + batchInfosStarted.foreach(info => { + info.schedulingDelay should not be None + info.schedulingDelay.get should be >= 0L + info.processingDelay should be (None) + info.totalDelay should be (None) + }) + + isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true) + isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true) } test("receiver info reporting") { @@ -88,58 +116,6 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } - test("SPARK-6766: batch info should be submitted") { - val ssc = setupStreams(input, operation) - val collector = new StreamingListener { - val batchInfos = new ArrayBuffer[BatchInfo] - - override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { - batchInfos += batchSubmitted.batchInfo - } - } - ssc.addStreamingListener(collector) - runStreams(ssc, input.size, input.size) - ssc.awaitTerminationOrTimeout(5000) should be (true) - - val batchInfos = collector.batchInfos - batchInfos should have size 4 - - batchInfos.foreach(info => { - info.schedulingDelay should be (None) - info.processingDelay should be (None) - info.totalDelay should be (None) - }) - - isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) - } - - test("SPARK-6766: processingStartTime of batch info should not be None when starting") { - val ssc = setupStreams(input, operation) - val collector = new StreamingListener { - val batchInfos = new ArrayBuffer[BatchInfo] - - override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { - batchInfos += batchStarted.batchInfo - } - } - ssc.addStreamingListener(collector) - runStreams(ssc, input.size, input.size) - ssc.awaitTerminationOrTimeout(5000) should be (true) - - val batchInfos = collector.batchInfos - batchInfos should have size 4 - - batchInfos.foreach(info => { - info.schedulingDelay should not be None - info.schedulingDelay.get should be >= 0L - info.processingDelay should be (None) - info.totalDelay should be (None) - }) - - isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) - isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true) - } - /** Check if a sequence of numbers is in increasing order */ def isInIncreasingOrder(seq: Seq[Long]): Boolean = { for(i <- 1 until seq.size) { @@ -152,6 +128,17 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { /** Listener that collects information on processed batches */ class BatchInfoCollector extends StreamingListener { val batchInfos = new ArrayBuffer[BatchInfo] + val batchInfosStarted = new ArrayBuffer[BatchInfo] + val batchInfosSubmitted = new ArrayBuffer[BatchInfo] + + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { + batchInfosSubmitted += batchSubmitted.batchInfo + } + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { + batchInfosStarted += batchStarted.batchInfo + } + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { batchInfos += batchCompleted.batchInfo } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 175f937444d4..ee5a40897e3c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -30,52 +30,31 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { override def batchDuration = Milliseconds(100) - test("onBatchSubmitted") { + test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " + + "onReceiverStarted, onReceiverError, onReceiverStopped") { val ssc = setupStreams(input, operation) val listener = new StreamingJobProgressListener(ssc) - val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None) - listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) - - listener.waitingBatches should be(List(batchInfoSubmitted)) - } - - test("onBatchStarted") { - val ssc = setupStreams(input, operation) - val listener = new StreamingJobProgressListener(ssc) - - val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None) - listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) - val receivedBlockInfo = Map( 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), 1 -> Array(ReceivedBlockInfo(1, 300, null)) ) - val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) - listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) - listener.runningBatches should be(List(batchInfoStarted)) - listener.waitingBatches should be(Nil) - listener.numTotalReceivedRecords should be(600) - } - - test("onBatchCompleted") { - val ssc = setupStreams(input, operation) - val listener = new StreamingJobProgressListener(ssc) - - val batchInfoSubmitted = BatchInfo(Time(1000), Map(), 1000, None, None) + // onBatchSubmitted + val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) + listener.waitingBatches should be(List(batchInfoSubmitted)) - val receivedBlockInfo = Map( - 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)), - 1 -> Array(ReceivedBlockInfo(1, 300, null)) - ) + // onBatchStarted val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) + listener.runningBatches should be(List(batchInfoStarted)) + listener.waitingBatches should be(Nil) + listener.numTotalReceivedRecords should be(600) + // onBatchCompleted val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) - listener.runningBatches should be (Nil) listener.waitingBatches should be (Nil) listener.lastCompletedBatch should be (Some(batchInfoCompleted)) @@ -83,9 +62,30 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.numTotalCompletedBatches should be (1) listener.numTotalProcessedRecords should be (600) listener.numTotalReceivedRecords should be (600) + + // onReceiverStarted + val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost") + listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted)) + listener.receiverInfo(0) should be (Some(receiverInfoStarted)) + listener.receiverInfo(1) should be (None) + + // onReceiverError + val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost") + listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError)) + listener.receiverInfo(0) should be (Some(receiverInfoStarted)) + listener.receiverInfo(1) should be (Some(receiverInfoError)) + listener.receiverInfo(2) should be (None) + + // onReceiverStopped + val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost") + listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped)) + listener.receiverInfo(0) should be (Some(receiverInfoStarted)) + listener.receiverInfo(1) should be (Some(receiverInfoError)) + listener.receiverInfo(2) should be (Some(receiverInfoStopped)) + listener.receiverInfo(3) should be (None) } - test("retain completed batch") { + test("Remove the old completed batches when exceeding the limit") { val ssc = setupStreams(input, operation) val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) val listener = new StreamingJobProgressListener(ssc) @@ -103,38 +103,4 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.retainedCompletedBatches.size should be (limit) listener.numTotalCompletedBatches should be(limit + 10) } - - test("onReceiverStarted") { - val ssc = setupStreams(input, operation) - val listener = new StreamingJobProgressListener(ssc) - - val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost") - listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfo)) - - listener.receiverInfo(0) should be (Some(receiverInfo)) - listener.receiverInfo(1) should be (None) - } - - test("onReceiverError") { - val ssc = setupStreams(input, operation) - val listener = new StreamingJobProgressListener(ssc) - - val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost") - listener.onReceiverError(StreamingListenerReceiverError(receiverInfo)) - - listener.receiverInfo(0) should be (Some(receiverInfo)) - listener.receiverInfo(1) should be (None) - } - - test("onReceiverStopped") { - val ssc = setupStreams(input, operation) - val listener = new StreamingJobProgressListener(ssc) - - val receiverInfo = ReceiverInfo(0, "test", null, true, "localhost") - listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfo)) - - listener.receiverInfo(0) should be (Some(receiverInfo)) - listener.receiverInfo(1) should be (None) - } - } From 2f8506030b572a4702991fd246bdcdf21d7ef509 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 10 Apr 2015 15:43:44 +0800 Subject: [PATCH 6/6] Update tests --- .../streaming/StreamingListenerSuite.scala | 38 +++++++++---------- .../StreamingJobProgressListenerSuite.scala | 25 +++++++++--- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index cb594b518ae0..b055938df78e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -46,23 +46,6 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { val collector = new BatchInfoCollector ssc.addStreamingListener(collector) runStreams(ssc, input.size, input.size) - ssc.awaitTerminationOrTimeout(5000) should be (true) - - val batchInfos = collector.batchInfos - batchInfos should have size 4 - - batchInfos.foreach(info => { - info.schedulingDelay should not be None - info.processingDelay should not be None - info.totalDelay should not be None - info.schedulingDelay.get should be >= 0L - info.processingDelay.get should be >= 0L - info.totalDelay.get should be >= 0L - }) - - isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true) - isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true) - isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true) // SPARK-6766: batch info should be submitted val batchInfosSubmitted = collector.batchInfosSubmitted @@ -89,6 +72,23 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true) isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true) + + // test onBatchCompleted + val batchInfosCompleted = collector.batchInfosCompleted + batchInfosCompleted should have size 4 + + batchInfosCompleted.foreach(info => { + info.schedulingDelay should not be None + info.processingDelay should not be None + info.totalDelay should not be None + info.schedulingDelay.get should be >= 0L + info.processingDelay.get should be >= 0L + info.totalDelay.get should be >= 0L + }) + + isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true) + isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true) + isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true) } test("receiver info reporting") { @@ -127,7 +127,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { /** Listener that collects information on processed batches */ class BatchInfoCollector extends StreamingListener { - val batchInfos = new ArrayBuffer[BatchInfo] + val batchInfosCompleted = new ArrayBuffer[BatchInfo] val batchInfosStarted = new ArrayBuffer[BatchInfo] val batchInfosSubmitted = new ArrayBuffer[BatchInfo] @@ -140,7 +140,7 @@ class BatchInfoCollector extends StreamingListener { } override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { - batchInfos += batchCompleted.batchInfo + batchInfosCompleted += batchCompleted.batchInfo } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index ee5a40897e3c..2b9d164500b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -43,22 +43,35 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchSubmitted val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) - listener.waitingBatches should be(List(batchInfoSubmitted)) + listener.waitingBatches should be (List(batchInfoSubmitted)) + listener.runningBatches should be (Nil) + listener.retainedCompletedBatches should be (Nil) + listener.lastCompletedBatch should be (None) + listener.numUnprocessedBatches should be (1) + listener.numTotalCompletedBatches should be (0) + listener.numTotalProcessedRecords should be (0) + listener.numTotalReceivedRecords should be (0) // onBatchStarted val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) - listener.runningBatches should be(List(batchInfoStarted)) - listener.waitingBatches should be(Nil) - listener.numTotalReceivedRecords should be(600) + listener.waitingBatches should be (Nil) + listener.runningBatches should be (List(batchInfoStarted)) + listener.retainedCompletedBatches should be (Nil) + listener.lastCompletedBatch should be (None) + listener.numUnprocessedBatches should be (1) + listener.numTotalCompletedBatches should be (0) + listener.numTotalProcessedRecords should be (0) + listener.numTotalReceivedRecords should be (600) // onBatchCompleted val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) - listener.runningBatches should be (Nil) listener.waitingBatches should be (Nil) - listener.lastCompletedBatch should be (Some(batchInfoCompleted)) + listener.runningBatches should be (Nil) listener.retainedCompletedBatches should be (List(batchInfoCompleted)) + listener.lastCompletedBatch should be (Some(batchInfoCompleted)) + listener.numUnprocessedBatches should be (0) listener.numTotalCompletedBatches should be (1) listener.numTotalProcessedRecords should be (600) listener.numTotalReceivedRecords should be (600)