diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2889f59e33e8..3ddfb0a3fd2e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -152,7 +152,7 @@ private[spark] class Executor(
}
override def run() {
- val startTime = System.currentTimeMillis()
+ val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
@@ -197,7 +197,7 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
- m.executorDeserializeTime = taskStart - startTime
+ m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index f02904df31fc..51dc08f668a4 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -24,6 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""
+ val TASK_DESERIALIZATION_TIME =
+ """Time spent deserializating the task closure on the executor."""
+
val INPUT = "Bytes read from Hadoop or from Spark storage."
val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index bf45272aefde..1e57cc42ecf5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -112,6 +112,13 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
Scheduler Delay
+
+
+
+ Task Deserialization Time
+
+
@@ -147,6 +154,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
("Index", ""), ("ID", ""), ("Attempt", ""), ("Status", ""), ("Locality Level", ""),
("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
+ ("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", TaskDetailsClassNames.GC_TIME),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
@@ -179,6 +187,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
}
+ val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
+ metrics.get.executorDeserializeTime.toDouble
+ }
+ val deserializationQuantiles =
+
+
+ Task Deserialization Time
+
+ | +: getFormattedTimeQuantiles(deserializationTimes)
+
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
@@ -266,6 +285,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val listings: Seq[Seq[Node]] = Seq(
{serviceQuantiles}
,
{schedulerDelayQuantiles}
,
+
+ {deserializationQuantiles}
+
{gcQuantiles}
,
{serializationQuantiles}
@@ -314,6 +336,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val schedulerDelay = getSchedulerDelay(info, metrics.get)
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
+ val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = info.gettingResultTime
@@ -367,6 +390,10 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
class={TaskDetailsClassNames.SCHEDULER_DELAY}>
{UIUtils.formatDuration(schedulerDelay.toLong)}
+ |
+ {UIUtils.formatDuration(taskDeserializationTime.toLong)}
+ |
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
|
@@ -424,6 +451,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
(info.finishTime - info.launchTime)
}
}
- totalExecutionTime - metrics.executorRunTime
+ val executorOverhead = (metrics.executorDeserializeTime +
+ metrics.resultSerializationTime)
+ totalExecutionTime - metrics.executorRunTime - executorOverhead
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 23d672cabda0..eb371bd0ea7e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -24,6 +24,7 @@ package org.apache.spark.ui.jobs
private object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val GC_TIME = "gc_time"
+ val TASK_DESERIALIZATION_TIME = "deserialization_time"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
}