Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/status/storeTypes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private[spark] object TaskIndexNames {
final val EXEC_RUN_TIME = "ert"
final val GC_TIME = "gc"
final val GETTING_RESULT_TIME = "grt"
final val HOST = "host"
final val INPUT_RECORDS = "ir"
final val INPUT_SIZE = "is"
final val LAUNCH_TIME = "lt"
Expand Down Expand Up @@ -165,6 +166,7 @@ private[spark] class TaskDataWrapper(
val duration: Long,
@KVIndexParam(value = TaskIndexNames.EXECUTOR, parent = TaskIndexNames.STAGE)
val executorId: String,
@KVIndexParam(value = TaskIndexNames.HOST, parent = TaskIndexNames.STAGE)
val host: String,
@KVIndexParam(value = TaskIndexNames.STATUS, parent = TaskIndexNames.STAGE)
val status: String,
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -961,16 +961,17 @@ private[ui] class TaskPagedTable(
}
}

private object ApiHelper {
private[ui] object ApiHelper {


private val COLUMN_TO_INDEX = Map(
private[ui] val COLUMN_TO_INDEX = Map(
"ID" -> null.asInstanceOf[String],
"Index" -> TaskIndexNames.TASK_INDEX,
"Attempt" -> TaskIndexNames.ATTEMPT,
"Status" -> TaskIndexNames.STATUS,
"Locality Level" -> TaskIndexNames.LOCALITY,
"Executor ID / Host" -> TaskIndexNames.EXECUTOR,
"Executor ID" -> TaskIndexNames.EXECUTOR,
"Host" -> TaskIndexNames.HOST,
"Launch Time" -> TaskIndexNames.LAUNCH_TIME,
"Duration" -> TaskIndexNames.DURATION,
"Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
Expand Down
63 changes: 62 additions & 1 deletion core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus}
import org.apache.spark.status.config._
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable}

class StagePageSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -47,6 +48,66 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))
}

test("ApiHelper.COLUMN_TO_INDEX should match headers of the task table") {
val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
val statusStore = AppStatusStore.createLiveStore(conf)
try {
val stageData = new StageData(
status = StageStatus.ACTIVE,
stageId = 1,
attemptId = 1,
numTasks = 1,
numActiveTasks = 1,
numCompleteTasks = 1,
numFailedTasks = 1,
numKilledTasks = 1,
numCompletedIndices = 1,

executorRunTime = 1L,
executorCpuTime = 1L,
submissionTime = None,
firstTaskLaunchedTime = None,
completionTime = None,
failureReason = None,

inputBytes = 1L,
inputRecords = 1L,
outputBytes = 1L,
outputRecords = 1L,
shuffleReadBytes = 1L,
shuffleReadRecords = 1L,
shuffleWriteBytes = 1L,
shuffleWriteRecords = 1L,
memoryBytesSpilled = 1L,
diskBytesSpilled = 1L,

name = "stage1",
description = Some("description"),
details = "detail",
schedulingPool = "pool1",

rddIds = Seq(1),
accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
tasks = None,
executorSummary = None,
killedTasksSummary = Map.empty
)
val taskTable = new TaskPagedTable(
stageData,
basePath = "/a/b/c",
currentTime = 0,
pageSize = 10,
sortColumn = "Index",
desc = false,
store = statusStore
)
val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet
assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
} finally {
statusStore.close()
}
}

/**
* Render a stage page started with the given conf and return the HTML.
* This also runs a dummy stage to populate the page with useful content.
Expand Down