@@ -292,7 +294,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
currentTime,
pageSize = taskPageSize,
sortColumn = taskSortColumn,
- desc = taskSortDesc
+ desc = taskSortDesc,
+ executorsListener = executorsListener
)
(_taskTable, _taskTable.table(taskPage))
} catch {
@@ -829,7 +832,8 @@ private[ui] class TaskTableRowData(
val shuffleRead: Option[TaskTableRowShuffleReadData],
val shuffleWrite: Option[TaskTableRowShuffleWriteData],
val bytesSpilled: Option[TaskTableRowBytesSpilledData],
- val error: String)
+ val error: String,
+ val logs: Map[String, String])
private[ui] class TaskDataSource(
tasks: Seq[TaskUIData],
@@ -842,7 +846,8 @@ private[ui] class TaskDataSource(
currentTime: Long,
pageSize: Int,
sortColumn: String,
- desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) {
+ desc: Boolean,
+ executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) {
import StagePage._
// Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table
@@ -985,6 +990,8 @@ private[ui] class TaskDataSource(
None
}
+ val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty)
+
new TaskTableRowData(
info.index,
info.taskId,
@@ -1008,7 +1015,8 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
- errorMessage.getOrElse(""))
+ errorMessage.getOrElse(""),
+ logs)
}
/**
@@ -1186,6 +1194,16 @@ private[ui] class TaskDataSource(
override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
Ordering.String.compare(x.error, y.error)
}
+ case "Executor Logs" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ if (x.logs.isEmpty == y.logs.isEmpty) {
+ return 0
+ } else if (x.logs.isEmpty) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
@@ -1210,7 +1228,8 @@ private[ui] class TaskPagedTable(
currentTime: Long,
pageSize: Int,
sortColumn: String,
- desc: Boolean) extends PagedTable[TaskTableRowData] {
+ desc: Boolean,
+ executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {
// We only track peak memory used for unsafe operators
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)
@@ -1230,7 +1249,8 @@ private[ui] class TaskPagedTable(
currentTime,
pageSize,
sortColumn,
- desc)
+ desc,
+ executorsListener)
override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
@@ -1291,7 +1311,8 @@ private[ui] class TaskPagedTable(
} else {
Nil
}} ++
- Seq(("Errors", ""))
+ Seq(("Errors", "")) ++
+ Seq(("Executor Logs", ""))
if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
@@ -1379,6 +1400,11 @@ private[ui] class TaskPagedTable(
{task.bytesSpilled.get.diskBytesSpilledReadable} |
}}
{errorMessageCell(task.error)}
+
+ {task.logs.map {
+ case (logName, logUrl) =>
+ }}
+ |
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 5989f0035b270..7792c5a882eff 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -29,6 +29,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
val killEnabled = parent.killEnabled
val progressListener = parent.jobProgressListener
val operationGraphListener = parent.operationGraphListener
+ val executorsListener = parent.executorsListener
attachPage(new AllStagesPage(this))
attachPage(new StagePage(this))
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 86699e7f56953..f6233450956d5 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -26,6 +26,8 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.exec.ExecutorsListener
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
@@ -62,11 +64,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
private def renderStagePage(conf: SparkConf): Seq[Node] = {
val jobListener = new JobProgressListener(conf)
val graphListener = new RDDOperationGraphListener(conf)
+ val executorsListener = new ExecutorsListener(new StorageStatusListener())
val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
val request = mock(classOf[HttpServletRequest])
when(tab.conf).thenReturn(conf)
when(tab.progressListener).thenReturn(jobListener)
when(tab.operationGraphListener).thenReturn(graphListener)
+ when(tab.executorsListener).thenReturn(executorsListener)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(request.getParameter("id")).thenReturn("0")