diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 3f1cab69068dc..831f60e870f74 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
import javax.servlet.http.HttpServletRequest
-import scala.xml.{NodeSeq, Node}
+import scala.xml.{NodeSeq, Node, Text}
import org.apache.commons.lang3.StringEscapeUtils
@@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
+private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
@@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
Error |
}
+ private def generateJobRow(
+ outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ sparkJob: SparkJobIdWithUIData): Seq[Node] = {
+ if (sparkJob.jobUIData.isDefined) {
+ generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+ numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+ } else {
+ generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+ numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+ }
+ }
+
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
- def generateJobRow(
+ private def generateNormalJobRow(
outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
- val lastStageInfo = Option(sparkJob.stageIds)
- .filter(_.nonEmpty)
- .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
- val lastStageData = lastStageInfo.flatMap { s =>
- sparkListener.stageIdToData.get((s.stageId, s.attemptId))
- }
-
- val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
- val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
@@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
{outputOpId.toString} |
-
- {lastStageDescription}
- {lastStageName}
+ {outputOpDescription}
|
{formattedOutputOpDuration} |
} else {
@@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
- private def generateOutputOpIdRow(
- outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
- val sparkjobDurations = sparkJobs.map(sparkJob => {
- sparkJob.submissionTime.map { start =>
- val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
- end - start
+ /**
+ * If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
+ * with "-" cells.
+ */
+ private def generateDroppedJobRow(
+ outputOpId: OutputOpId,
+ outputOpDescription: Seq[Node],
+ formattedOutputOpDuration: String,
+ numSparkJobRowsInOutputOp: Int,
+ isFirstRow: Boolean,
+ jobId: Int): Seq[Node] = {
+ // In the first row, output op id and its information needs to be shown. In other rows, these
+ // cells will be taken up due to "rowspan".
+ // scalastyle:off
+ val prefixCells =
+ if (isFirstRow) {
+ {outputOpId.toString} |
+ {outputOpDescription} |
+ {formattedOutputOpDuration} |
+ } else {
+ Nil
}
- })
+ // scalastyle:on
+
+
+ {prefixCells}
+ |
+ {jobId.toString}
+ |
+
+ - |
+
+ - |
+
+ - |
+
+ - |
+
+ }
+
+ private def generateOutputOpIdRow(
+ outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+ // We don't count the durations of dropped jobs
+ val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
+ map(sparkJob => {
+ sparkJob.submissionTime.map { start =>
+ val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ })
val formattedOutputOpDuration =
- if (sparkjobDurations.exists(_ == None)) {
- // If any job does not finish, set "formattedOutputOpDuration" to "-"
+ if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
+ // If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
- SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+ SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
}
- generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+
+ val description = generateOutputOpDescription(sparkJobs)
+
+ generateJobRow(
+ outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
- generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+ generateJobRow(
+ outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}
+ private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+ val lastStageInfo =
+ sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
+ flatMap { sparkJob => // For the first job, get the latest Stage info
+ if (sparkJob.stageIds.isEmpty) {
+ None
+ } else {
+ sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
+ }
+ }
+ val lastStageData = lastStageInfo.flatMap { s =>
+ sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+ }
+
+ val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+
+
+ {lastStageDescription}
+ ++ Text(lastStageName)
+ }
+
private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
@@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
- val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+ val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
- // Filter out spark Job ids that don't exist in sparkListener
- (outputOpId, sparkJobIds.flatMap(getJobData))
+ (outputOpId,
+ sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
@@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
{
outputOpIdWithJobs.map {
- case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+ case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
}
}