Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<th>Shuffle Spill (Memory)</th>
<th>Shuffle Spill (Disk)</th>
}}
<th>Logs</th>
</thead>
<tbody>
{createExecutorTable()}
Expand Down Expand Up @@ -147,6 +148,12 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{Utils.bytesToString(v.diskBytesSpilled)}
</td>
}}
<td>
{val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty)
logs.map {
case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
}}
</td>
</tr>
}
case None =>
Expand Down
40 changes: 33 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.{InternalAccumulator, SparkConf}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality}
import org.apache.spark.ui._
import org.apache.spark.ui.exec.ExecutorsListener
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Utils, Distribution}

Expand All @@ -39,6 +40,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

private val progressListener = parent.progressListener
private val operationGraphListener = parent.operationGraphListener
private val executorsListener = parent.executorsListener

private val TIMELINE_LEGEND = {
<div class="legend-area">
Expand Down Expand Up @@ -292,7 +294,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
currentTime,
pageSize = taskPageSize,
sortColumn = taskSortColumn,
desc = taskSortDesc
desc = taskSortDesc,
executorsListener = executorsListener
)
(_taskTable, _taskTable.table(taskPage))
} catch {
Expand Down Expand Up @@ -829,7 +832,8 @@ private[ui] class TaskTableRowData(
val shuffleRead: Option[TaskTableRowShuffleReadData],
val shuffleWrite: Option[TaskTableRowShuffleWriteData],
val bytesSpilled: Option[TaskTableRowBytesSpilledData],
val error: String)
val error: String,
val logs: Map[String, String])

private[ui] class TaskDataSource(
tasks: Seq[TaskUIData],
Expand All @@ -842,7 +846,8 @@ private[ui] class TaskDataSource(
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) {
desc: Boolean,
executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) {
import StagePage._

// Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table
Expand Down Expand Up @@ -985,6 +990,8 @@ private[ui] class TaskDataSource(
None
}

val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty)

new TaskTableRowData(
info.index,
info.taskId,
Expand All @@ -1008,7 +1015,8 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
errorMessage.getOrElse(""))
errorMessage.getOrElse(""),
logs)
}

/**
Expand Down Expand Up @@ -1186,6 +1194,16 @@ private[ui] class TaskDataSource(
override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
Ordering.String.compare(x.error, y.error)
}
case "Executor Logs" => new Ordering[TaskTableRowData] {
override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
if (x.logs.isEmpty == y.logs.isEmpty) {
return 0
} else if (x.logs.isEmpty) {
return -1;
} else {
return 1;
}
}
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
Expand All @@ -1210,7 +1228,8 @@ private[ui] class TaskPagedTable(
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedTable[TaskTableRowData] {
desc: Boolean,
executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {

// We only track peak memory used for unsafe operators
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)
Expand All @@ -1230,7 +1249,8 @@ private[ui] class TaskPagedTable(
currentTime,
pageSize,
sortColumn,
desc)
desc,
executorsListener)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
Expand Down Expand Up @@ -1291,7 +1311,8 @@ private[ui] class TaskPagedTable(
} else {
Nil
}} ++
Seq(("Errors", ""))
Seq(("Errors", "")) ++
Seq(("Executor Logs", ""))

if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
Expand Down Expand Up @@ -1379,6 +1400,11 @@ private[ui] class TaskPagedTable(
<td>{task.bytesSpilled.get.diskBytesSpilledReadable}</td>
}}
{errorMessageCell(task.error)}
<td>
{task.logs.map {
case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
}}
</td>
</tr>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
val killEnabled = parent.killEnabled
val progressListener = parent.jobProgressListener
val operationGraphListener = parent.operationGraphListener
val executorsListener = parent.executorsListener

attachPage(new AllStagesPage(this))
attachPage(new StagePage(this))
Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.exec.ExecutorsListener
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener

Expand Down Expand Up @@ -62,11 +64,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
private def renderStagePage(conf: SparkConf): Seq[Node] = {
val jobListener = new JobProgressListener(conf)
val graphListener = new RDDOperationGraphListener(conf)
val executorsListener = new ExecutorsListener(new StorageStatusListener())
val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
val request = mock(classOf[HttpServletRequest])
when(tab.conf).thenReturn(conf)
when(tab.progressListener).thenReturn(jobListener)
when(tab.operationGraphListener).thenReturn(graphListener)
when(tab.executorsListener).thenReturn(executorsListener)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(request.getParameter("id")).thenReturn("0")
Expand Down