Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala

This file was deleted.

29 changes: 16 additions & 13 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable
import scala.xml.Node

import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils

/** Page showing executor summary */
Expand Down Expand Up @@ -64,28 +65,30 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}

val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
executorIdToSummary match {
case Some(x) =>
x.toSeq.sortBy(_._1).map { case (k, v) => {
// scalastyle:off
listener.stageIdToData.get(stageId) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td sorttable_customekey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td>
<td sorttable_customekey={v.inputBytes.toString}>
{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>
{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString}>
{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString}>
{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
// scalastyle:on
}
}
case _ => Seq[Node]()
case None =>
Seq.empty[Node]
}
}
}
156 changes: 59 additions & 97 deletions core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.jobs.UIData._

/**
* :: DeveloperApi ::
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId
* updating the internal data structures concurrently.
*/
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener {
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

import JobProgressListener._

Expand All @@ -46,20 +47,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()

// TODO: Should probably consolidate all following into a single hash map.
val stageIdToTime = HashMap[Int, Long]()
val stageIdToInputBytes = HashMap[Int, Long]()
val stageIdToShuffleRead = HashMap[Int, Long]()
val stageIdToShuffleWrite = HashMap[Int, Long]()
val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]()
val stageIdToTasksComplete = HashMap[Int, Int]()
val stageIdToTasksFailed = HashMap[Int, Int]()
val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]()
val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
val stageIdToPool = HashMap[Int, String]()
val stageIdToDescription = HashMap[Int, String]()
val stageIdToData = new HashMap[Int, StageUIData]

val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()

val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
Expand All @@ -71,8 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
val stageId = stage.stageId
// Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
val stageData = stageIdToData.getOrElseUpdate(stageId, {
logWarning("Stage completed for unknown stage " + stageId)
new StageUIData
})

poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
Expand All @@ -87,21 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s =>
stageIdToTime.remove(s.stageId)
stageIdToInputBytes.remove(s.stageId)
stageIdToShuffleRead.remove(s.stageId)
stageIdToShuffleWrite.remove(s.stageId)
stageIdToMemoryBytesSpilled.remove(s.stageId)
stageIdToDiskBytesSpilled.remove(s.stageId)
stageIdToTasksActive.remove(s.stageId)
stageIdToTasksComplete.remove(s.stageId)
stageIdToTasksFailed.remove(s.stageId)
stageIdToTaskData.remove(s.stageId)
stageIdToExecutorSummaries.remove(s.stageId)
stageIdToPool.remove(s.stageId)
stageIdToDescription.remove(s.stageId)
}
stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
stages.trimStart(toRemove)
}
}
Expand All @@ -114,26 +93,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageIdToPool(stage.stageId) = poolName

val description = Option(stageSubmitted.properties).flatMap {
val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
stageData.schedulingPool = poolName

stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
description.map(d => stageIdToDescription(stage.stageId) = d)

val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
stages(stage.stageId) = stage
}

override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val sid = taskStart.stageId
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
tasksActive(taskInfo.taskId) = taskInfo
val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
stageIdToTaskData(sid) = taskMap
val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
}
}

Expand All @@ -143,88 +123,76 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val sid = taskEnd.stageId
val info = taskEnd.taskInfo

if (info != null) {
val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})

// create executor summary map if necessary
val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
op = new HashMap[String, ExecutorSummary]())
val executorSummaryMap = stageData.executorSummary
executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)

val executorSummary = executorSummaryMap.get(info.executorId)
executorSummary match {
case Some(y) => {
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
executorSummaryMap.get(info.executorId).foreach { y =>
// first update failed-task, succeed-task
taskEnd.reason match {
case Success =>
y.succeededTasks += 1
case _ =>
y.failedTasks += 1
}

// update duration
y.taskTime += info.duration

val metrics = taskEnd.taskMetrics
if (metrics != null) {
metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
y.memoryBytesSpilled += metrics.memoryBytesSpilled
y.diskBytesSpilled += metrics.diskBytesSpilled
}
case _ => {}
}

val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
// Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
tasksActive.remove(info.taskId)
stageData.numActiveTasks -= 1

val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
taskEnd.reason match {
case org.apache.spark.Success =>
stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
stageData.numCompleteTasks += 1
(None, Option(taskEnd.taskMetrics))
case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
stageData.numFailedTasks += 1
(Some(e.toErrorString), e.metrics)
case e: TaskFailedReason => // All other failure cases
stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
stageData.numFailedTasks += 1
(Some(e.toErrorString), None)
}

stageIdToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(_.executorRunTime).getOrElse(0L)
stageIdToTime(sid) += time

stageIdToInputBytes.getOrElseUpdate(sid, 0L)
val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
stageData.executorRunTime += taskRunTime
val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
stageIdToInputBytes(sid) += inputBytes
stageData.inputBytes += inputBytes

stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
stageIdToShuffleRead(sid) += shuffleRead
stageData.shuffleReadBytes += shuffleRead

stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite =
metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
stageIdToShuffleWrite(sid) += shuffleWrite
stageData.shuffleWriteBytes += shuffleWrite

stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
stageData.memoryBytesSpilled += memoryBytesSpilled

stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
stageData.diskBytesSpilled += diskBytesSpilled

val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
stageIdToTaskData(sid) = taskMap
stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage)
}
}
} // end of onTaskEnd

override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
Expand Down Expand Up @@ -252,12 +220,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {

}

@DeveloperApi
case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
errorMessage: Option[String] = None)

private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
Expand Down
Loading