From 6ceffb347568d5fec01e27a1ca9d0a3c69ced63f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 8 Apr 2015 14:38:28 +0800 Subject: [PATCH 01/10] Add "Active Batches" and "Completed Batches" lists to StreamingPage --- .../spark/streaming/ui/AllBatchesTable.scala | 113 ++++++++++++++++++ .../spark/streaming/ui/StreamingPage.scala | 19 ++- 2 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala new file mode 100644 index 000000000000..2aa9d5cb8906 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import scala.xml.Node + +import org.apache.spark.streaming.scheduler.BatchInfo +import org.apache.spark.ui.UIUtils + +private[ui] class BatchTableBase { + + protected def columns: Seq[Node] = { + Batch Time + Input Size + Scheduling Delay + Processing Time + } + + protected def baseRow(batch: BatchInfo): Seq[Node] = { + val batchTime = batch.batchTime.milliseconds + val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds) + val eventCount = batch.receivedBlockInfo.values.map { + receivers => receivers.map(_.numRecords).sum + }.sum + val schedulingDelay = batch.schedulingDelay + val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-") + val processingTime = batch.processingDelay + val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-") + + {formattedBatchTime} + {eventCount.toString} events + + {formattedSchedulingDelay} + + + {formattedProcessingTime} + + } + + private def batchTable: Seq[Node] = { + + + {columns} + + + {renderRows} + +
+ } + + def toNodeSeq: Seq[Node] = { + batchTable + } + + /** + * Return HTML for all rows of this table. + */ + protected def renderRows: Seq[Node] = Nil +} + +private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo]) + extends BatchTableBase { + + override protected def columns: Seq[Node] = super.columns ++ Status + + override protected def renderRows: Seq[Node] = { + // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display + // waiting batches before running batches + waitingBatches.flatMap(batch => {waitingBatchRow(batch)}) ++ + runningBatches.flatMap(batch => {runningBatchRow(batch)}) + } + + private def runningBatchRow(batch: BatchInfo): Seq[Node] = { + baseRow(batch) ++ processing + } + + private def waitingBatchRow(batch: BatchInfo): Seq[Node] = { + baseRow(batch) ++ queued + } +} + +private[ui] class CompletedBatchTable(batches: Seq[BatchInfo]) extends BatchTableBase { + + override protected def columns: Seq[Node] = super.columns ++ Total Delay + + override protected def renderRows: Seq[Node] = { + batches.flatMap(batch => {completedBatchRow(batch)}) + } + + private def completedBatchRow(batch: BatchInfo): Seq[Node] = { + val totalDelay = batch.totalDelay + val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-") + baseRow(batch) ++ + + {formattedTotalDelay} + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index bfe8086fcf8f..675dc50cb28e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -41,7 +41,8 @@ private[ui] class StreamingPage(parent: StreamingTab) generateBasicStats() ++

++

Statistics over last {listener.retainedCompletedBatches.size} processed batches

++ generateReceiverStats() ++ - generateBatchStatsTable() + generateBatchStatsTable() ++ + generateBatchListTables() UIUtils.headerSparkPage("Streaming", content, parent, Some(5000)) } @@ -189,5 +190,21 @@ private[ui] class StreamingPage(parent: StreamingTab) } UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) } + + private def generateBatchListTables(): Seq[Node] = { + val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse + val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse + var content = +

Active Batches ({runningBatches.size + waitingBatches.size})

++ + new ActiveBatchTable(runningBatches, waitingBatches).toNodeSeq + + val completedBatches = listener.retainedCompletedBatches. + sortBy(_.batchTime.milliseconds).reverse + content ++= +

Completed Batches ({completedBatches.size})

++ + new CompletedBatchTable(completedBatches).toNodeSeq + + content + } } From d18ab7df9403bcee83e4c754af72571956a85e54 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 9 Apr 2015 21:37:08 +0800 Subject: [PATCH 02/10] Fix the code style --- .../apache/spark/streaming/ui/StreamingPage.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 675dc50cb28e..ed007f42fa06 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -194,17 +194,20 @@ private[ui] class StreamingPage(parent: StreamingTab) private def generateBatchListTables(): Seq[Node] = { val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse - var content = + val completedBatches = listener.retainedCompletedBatches. + sortBy(_.batchTime.milliseconds).reverse + + val activeBatchesContent = {

Active Batches ({runningBatches.size + waitingBatches.size})

++ new ActiveBatchTable(runningBatches, waitingBatches).toNodeSeq + } - val completedBatches = listener.retainedCompletedBatches. - sortBy(_.batchTime.milliseconds).reverse - content ++= + val completedBatchesContent = {

Completed Batches ({completedBatches.size})

++ new CompletedBatchTable(completedBatches).toNodeSeq + } - content + activeBatchesContent ++ completedBatchesContent } } From b248787c71ca4893ece03ceb93d3c2e519f937c5 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 9 Apr 2015 22:18:33 +0800 Subject: [PATCH 03/10] Add tests to verify the new tables --- .../apache/spark/streaming/ui/AllBatchesTable.scala | 9 +++++---- .../org/apache/spark/streaming/UISeleniumSuite.scala | 11 +++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 2aa9d5cb8906..c5df4a442e4a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -22,7 +22,7 @@ import scala.xml.Node import org.apache.spark.streaming.scheduler.BatchInfo import org.apache.spark.ui.UIUtils -private[ui] class BatchTableBase { +private[ui] class BatchTableBase(tableId: String) { protected def columns: Seq[Node] = { Batch Time @@ -53,7 +53,7 @@ private[ui] class BatchTableBase { } private def batchTable: Seq[Node] = { - +
{columns} @@ -74,7 +74,7 @@ private[ui] class BatchTableBase { } private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo]) - extends BatchTableBase { + extends BatchTableBase("active-batches-table") { override protected def columns: Seq[Node] = super.columns ++ @@ -94,7 +94,8 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche } } -private[ui] class CompletedBatchTable(batches: Seq[BatchInfo]) extends BatchTableBase { +private[ui] class CompletedBatchTable(batches: Seq[BatchInfo]) + extends BatchTableBase("completed-batches-table") { override protected def columns: Seq[Node] = super.columns ++ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 998426ebb82e..d3eee0457768 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -75,6 +75,17 @@ class UISeleniumSuite val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq statisticText should contain("Network receivers:") statisticText should contain("Batch interval:") + + val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq + h4Text should contain("Active Batches (0)") + h4Text should contain("Completed Batches (0)") + + findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { + List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status") + } + findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { + List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay") + } } ssc.stop(false) From 86b5e7fa483263eadda1cae0c5adab6d68f3f98c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 13 Apr 2015 01:04:53 +0800 Subject: [PATCH 04/10] Make BatchTableBase abstract --- .../scala/org/apache/spark/streaming/ui/AllBatchesTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index c5df4a442e4a..df1c0a10704c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -22,7 +22,7 @@ import scala.xml.Node import org.apache.spark.streaming.scheduler.BatchInfo import org.apache.spark.ui.UIUtils -private[ui] class BatchTableBase(tableId: String) { +private[ui] abstract class BatchTableBase(tableId: String) { protected def columns: Seq[Node] = { @@ -70,7 +70,7 @@ private[ui] class BatchTableBase(tableId: String) { /** * Return HTML for all rows of this table. */ - protected def renderRows: Seq[Node] = Nil + protected def renderRows: Seq[Node] } private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo]) From a12ad7bc1507e0f3811097a83417fc25fd93d966 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 14 Apr 2015 00:03:29 +0800 Subject: [PATCH 05/10] Change 'records' to 'events' in the UI --- .../apache/spark/streaming/ui/StreamingPage.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index ed007f42fa06..c41801fc82eb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -69,10 +69,10 @@ private[ui] class StreamingPage(parent: StreamingTab) Waiting batches: {listener.numUnprocessedBatches}
  • - Received records: {listener.numTotalReceivedRecords} + Received events: {listener.numTotalReceivedRecords}
  • - Processed records: {listener.numTotalProcessedRecords} + Processed events: {listener.numTotalProcessedRecords}
  • } @@ -86,10 +86,10 @@ private[ui] class StreamingPage(parent: StreamingTab) "Receiver", "Status", "Location", - "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", - "Minimum rate\n[records/sec]", - "Median rate\n[records/sec]", - "Maximum rate\n[records/sec]", + "Events in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", + "Minimum rate\n[events/sec]", + "Median rate\n[events/sec]", + "Maximum rate\n[events/sec]", "Last Error" ) val dataRows = (0 until listener.numReceivers).map { receiverId => From a69c0913e64d72210be0d1deb89e66136d28f182 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 14 Apr 2015 00:10:15 +0800 Subject: [PATCH 06/10] Show the number of total completed batches too --- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index c41801fc82eb..652f2e168597 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -203,7 +203,9 @@ private[ui] class StreamingPage(parent: StreamingTab) } val completedBatchesContent = { -

    Completed Batches ({completedBatches.size})

    ++ +

    + Completed Batches (last {completedBatches.size} out of {listener.numTotalCompletedBatches}) +

    ++ new CompletedBatchTable(completedBatches).toNodeSeq } From 252533640bd94477e486967c41db077b34a19c4f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 14 Apr 2015 00:11:07 +0800 Subject: [PATCH 07/10] Rename 'Processed batches' and 'Waiting batches' and also add links --- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 652f2e168597..36636f6e0394 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -63,10 +63,10 @@ private[ui] class StreamingPage(parent: StreamingTab) Batch interval: {formatDurationVerbose(listener.batchDuration)}
  • - Processed batches: {listener.numTotalCompletedBatches} + Completed batches: {listener.numTotalCompletedBatches}
  • - Waiting batches: {listener.numUnprocessedBatches} + Active batches: {listener.numUnprocessedBatches}
  • Received events: {listener.numTotalReceivedRecords} From 6f3078e8ed8206356aab987fd1603f0a2d80e1bb Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 14 Apr 2015 00:42:45 +0800 Subject: [PATCH 08/10] Make 'startTime' readable --- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 1be11ee483db..66e184197e46 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -52,7 +52,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val timeSinceStart = System.currentTimeMillis() - startTime
    • - Started at: {startTime.toString} + Started at: {UIUtils.formatDate(startTime)}
    • Time since start: {formatDurationVerbose(timeSinceStart)} From 51b792ef32d67a7e79fefe051013b6dbbf95b098 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 14 Apr 2015 00:48:21 +0800 Subject: [PATCH 09/10] Fix the unit test --- .../test/scala/org/apache/spark/streaming/UISeleniumSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index d3eee0457768..205ddf6dbe9b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -78,7 +78,7 @@ class UISeleniumSuite val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq h4Text should contain("Active Batches (0)") - h4Text should contain("Completed Batches (0)") + h4Text should contain("Completed Batches (last 0 out of 0)") findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status") From be50fc66cb3e471a892d4d38565d8644c8b1e690 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 14 Apr 2015 01:09:04 +0800 Subject: [PATCH 10/10] Fix the code style --- .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 66e184197e46..07fa285642ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -50,6 +50,7 @@ private[ui] class StreamingPage(parent: StreamingTab) /** Generate basic stats of the streaming program */ private def generateBasicStats(): Seq[Node] = { val timeSinceStart = System.currentTimeMillis() - startTime + // scalastyle:off
      • Started at: {UIUtils.formatDate(startTime)} @@ -76,6 +77,7 @@ private[ui] class StreamingPage(parent: StreamingTab) Processed events: {listener.numTotalProcessedRecords}
      + // scalastyle:on } /** Generate stats of data received by the receivers in the streaming program */
  • StatusTotal DelayBatch Time