Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 12 additions & 289 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@

package org.apache.spark.ui.jobs

import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Date
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.HashSet
import scala.xml.{Node, Unparsed}

import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.internal.config.UI._
import org.apache.spark.scheduler.TaskLocality
import org.apache.spark.status._
Expand Down Expand Up @@ -209,32 +204,20 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val dagViz = UIUtils.showDagVizForStage(stageId, stageGraph)

val currentTime = System.currentTimeMillis()
val taskTable = try {
val _taskTable = new TaskPagedTable(
stageData,
UIUtils.prependBaseUri(request, parent.basePath) +
s"/stages/stage/?id=${stageId}&attempt=${stageAttemptId}",
pageSize = taskPageSize,
sortColumn = taskSortColumn,
desc = taskSortDesc,
store = parent.store
)
_taskTable
} catch {
case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
null
}

val content =
summary ++
dagViz ++ <div id="showAdditionalMetrics"></div> ++
makeTimeline(
// Only show the tasks in the table
Option(taskTable).map({ taskPagedTable =>
() => {
val from = (eventTimelineTaskPage - 1) * eventTimelineTaskPageSize
val to = taskPagedTable.dataSource.dataSize.min(
eventTimelineTaskPage * eventTimelineTaskPageSize)
taskPagedTable.dataSource.sliceData(from, to)}).getOrElse(Nil), currentTime,
val dataSize = store.taskCount(stageData.stageId, stageData.attemptId).toInt
val to = dataSize.min(eventTimelineTaskPage * eventTimelineTaskPageSize)
val sliceData = store.taskList(stageData.stageId, stageData.attemptId, from, to - from,
indexName(taskSortColumn), !taskSortDesc)
sliceData
}, currentTime,
eventTimelineTaskPage, eventTimelineTaskPageSize, eventTimelineTotalPages, stageId,
stageAttemptId, totalTasks) ++
<div id="parent-container">
Expand All @@ -246,8 +229,8 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We

}

def makeTimeline(
tasks: Seq[TaskData],
private def makeTimeline(
tasksFunc: () => Seq[TaskData],
currentTime: Long,
page: Int,
pageSize: Int,
Expand All @@ -258,6 +241,8 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We

if (!TIMELINE_ENABLED) return Seq.empty[Node]

val tasks = tasksFunc()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. It's reasonable to avoid the calsulation if the timeline feature is disabled.


val executorsSet = new HashSet[(String, String)]
var minLaunchTime = Long.MaxValue
var maxFinishTime = Long.MinValue
Expand Down Expand Up @@ -453,268 +438,6 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We

}

private[ui] class TaskDataSource(
stage: StageData,
pageSize: Int,
sortColumn: String,
desc: Boolean,
store: AppStatusStore) extends PagedDataSource[TaskData](pageSize) {
import ApiHelper._

// Keep an internal cache of executor log maps so that long task lists render faster.
private val executorIdToLogs = new HashMap[String, Map[String, String]]()

private var _tasksToShow: Seq[TaskData] = null

override def dataSize: Int = store.taskCount(stage.stageId, stage.attemptId).toInt

override def sliceData(from: Int, to: Int): Seq[TaskData] = {
if (_tasksToShow == null) {
_tasksToShow = store.taskList(stage.stageId, stage.attemptId, from, to - from,
indexName(sortColumn), !desc)
}
_tasksToShow
}

def executorLogs(id: String): Map[String, String] = {
executorIdToLogs.getOrElseUpdate(id,
store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty))
}

}

private[ui] class TaskPagedTable(
stage: StageData,
basePath: String,
pageSize: Int,
sortColumn: String,
desc: Boolean,
store: AppStatusStore) extends PagedTable[TaskData] {

import ApiHelper._

private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

override def tableId: String = "task-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable"

override def pageSizeFormField: String = "task.pageSize"

override def pageNumberFormField: String = "task.page"

override val dataSource: TaskDataSource = new TaskDataSource(
stage,
pageSize,
sortColumn,
desc,
store)

override def pageLink(page: Int): String = {
basePath +
s"&$pageNumberFormField=$page" +
s"&task.sort=$encodedSortColumn" +
s"&task.desc=$desc" +
s"&$pageSizeFormField=$pageSize"
}

override def goButtonFormPath: String = s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"

def headers: Seq[Node] = {
import ApiHelper._

val taskHeadersAndCssClasses: Seq[(String, String)] =
Seq(
(HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), (HEADER_STATUS, ""),
(HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), (HEADER_LAUNCH_TIME, ""),
(HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, TaskDetailsClassNames.SCHEDULER_DELAY),
(HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
(HEADER_GC_TIME, ""),
(HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
(HEADER_GETTING_RESULT_TIME, TaskDetailsClassNames.GETTING_RESULT_TIME),
(HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
{if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} ++
{if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++
{if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++
{if (hasShuffleRead(stage)) {
Seq((HEADER_SHUFFLE_READ_FETCH_WAIT_TIME,
TaskDetailsClassNames.SHUFFLE_READ_FETCH_WAIT_TIME),
(HEADER_SHUFFLE_TOTAL_READS, ""),
(HEADER_SHUFFLE_REMOTE_READS, TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
{if (hasShuffleWrite(stage)) {
Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, ""))
} else {
Nil
}} ++
{if (hasBytesSpilled(stage)) {
Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, ""))
} else {
Nil
}} ++
Seq((HEADER_ERROR, ""))

if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
}

val headerRow: Seq[Node] = {
taskHeadersAndCssClasses.map { case (header, cssClass) =>
if (header == sortColumn) {
val headerLink = Unparsed(
basePath +
s"&task.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&task.desc=${!desc}" +
s"&task.pageSize=$pageSize")
val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
<th class={cssClass}>
<a href={headerLink}>
{header}
<span>&nbsp;{Unparsed(arrow)}</span>
</a>
</th>
} else {
val headerLink = Unparsed(
basePath +
s"&task.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&task.pageSize=$pageSize")
<th class={cssClass}>
<a href={headerLink}>
{header}
</a>
</th>
}
}
}
<thead>{headerRow}</thead>
}

def row(task: TaskData): Seq[Node] = {
def formatDuration(value: Option[Long], hideZero: Boolean = false): String = {
value.map { v =>
if (v > 0 || !hideZero) UIUtils.formatDuration(v) else ""
}.getOrElse("")
}

def formatBytes(value: Option[Long]): String = {
Utils.bytesToString(value.getOrElse(0L))
}

<tr>
<td>{task.index}</td>
<td>{task.taskId}</td>
<td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td>
<td>{task.status}</td>
<td>{task.taskLocality}</td>
<td>{task.executorId}</td>
<td>
<div style="float: left">{task.host}</div>
<div style="float: right">
{
dataSource.executorLogs(task.executorId).map {
case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
}
}
</div>
</td>
<td>{UIUtils.formatDate(task.launchTime)}</td>
<td>{formatDuration(task.taskMetrics.map(_.executorRunTime))}</td>
<td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
{UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))}
</td>
<td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{formatDuration(task.taskMetrics.map(_.executorDeserializeTime))}
</td>
<td>
{formatDuration(task.taskMetrics.map(_.jvmGcTime), hideZero = true)}
</td>
<td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
{formatDuration(task.taskMetrics.map(_.resultSerializationTime))}
</td>
<td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
{UIUtils.formatDuration(AppStatusUtils.gettingResultTime(task))}
</td>
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
{formatBytes(task.taskMetrics.map(_.peakExecutionMemory))}
</td>
{if (hasAccumulators(stage)) {
<td>{accumulatorsInfo(task)}</td>
}}
{if (hasInput(stage)) {
<td>{
metricInfo(task) { m =>
val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead)
val records = m.inputMetrics.recordsRead
Unparsed(s"$bytesRead / $records")
}
}</td>
}}
{if (hasOutput(stage)) {
<td>{
metricInfo(task) { m =>
val bytesWritten = Utils.bytesToString(m.outputMetrics.bytesWritten)
val records = m.outputMetrics.recordsWritten
Unparsed(s"$bytesWritten / $records")
}
}</td>
}}
{if (hasShuffleRead(stage)) {
<td class={TaskDetailsClassNames.SHUFFLE_READ_FETCH_WAIT_TIME}>
{formatDuration(task.taskMetrics.map(_.shuffleReadMetrics.fetchWaitTime))}
</td>
<td>{
metricInfo(task) { m =>
val bytesRead = Utils.bytesToString(totalBytesRead(m.shuffleReadMetrics))
val records = m.shuffleReadMetrics.recordsRead
Unparsed(s"$bytesRead / $records")
}
}</td>
<td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
{formatBytes(task.taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead))}
</td>
}}
{if (hasShuffleWrite(stage)) {
<td>{
formatDuration(
task.taskMetrics.map { m =>
TimeUnit.NANOSECONDS.toMillis(m.shuffleWriteMetrics.writeTime)
},
hideZero = true)
}</td>
<td>{
metricInfo(task) { m =>
val bytesWritten = Utils.bytesToString(m.shuffleWriteMetrics.bytesWritten)
val records = m.shuffleWriteMetrics.recordsWritten
Unparsed(s"$bytesWritten / $records")
}
}</td>
}}
{if (hasBytesSpilled(stage)) {
<td>{formatBytes(task.taskMetrics.map(_.memoryBytesSpilled))}</td>
<td>{formatBytes(task.taskMetrics.map(_.diskBytesSpilled))}</td>
}}
{UIUtils.errorMessageCell(task.errorMessage.getOrElse(""))}
</tr>
}

private def accumulatorsInfo(task: TaskData): Seq[Node] = {
task.accumulatorUpdates.flatMap { acc =>
if (acc.name != null && acc.update.isDefined) {
Unparsed(StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")) ++ <br />
} else {
Nil
}
}
}

private def metricInfo(task: TaskData)(fn: TaskMetrics => Seq[Node]): Seq[Node] = {
task.taskMetrics.map(fn).getOrElse(Nil)
}
}

private[spark] object ApiHelper {

val HEADER_ID = "ID"
Expand Down
12 changes: 1 addition & 11 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable}
import org.apache.spark.ui.jobs.{StagePage, StagesTab}

class StagePageSuite extends SparkFunSuite with LocalSparkContext {

Expand Down Expand Up @@ -110,16 +110,6 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
isShufflePushEnabled = false,
shuffleMergersCount = 0
)
val taskTable = new TaskPagedTable(
stageData,
basePath = "/a/b/c",
pageSize = 10,
sortColumn = "Index",
desc = false,
store = statusStore
)
val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet
assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
} finally {
statusStore.close()
}
Expand Down