From 1752f0e740fd05d1b00bf0f0fe238708e8ff4c67 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Apr 2015 10:59:48 -0700 Subject: [PATCH 1/7] [SPARK-7058] Incorporate RDD deserialization time in task deserialization time metric --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- .../main/scala/org/apache/spark/executor/TaskMetrics.scala | 5 ++--- .../main/scala/org/apache/spark/scheduler/ResultTask.scala | 3 +++ .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 3 +++ core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- 6 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 327d155b38c22..208cc203fa75a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -221,7 +221,7 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.setExecutorDeserializeTime(taskStart - deserializeStartTime) + m.incExecutorDeserializeTime(taskStart - deserializeStartTime) m.setExecutorRunTime(taskFinish - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 06152f16ae618..7fc106947b5cf 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -47,10 +47,9 @@ class TaskMetrics extends Serializable { /** * Time taken on the executor to deserialize this task */ - private var _executorDeserializeTime: Long = _ + private var _executorDeserializeTime: Long = 0 def executorDeserializeTime: Long = _executorDeserializeTime - private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value - + private[spark] def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value /** * Time the executor spends actually running the task (including fetching shuffle data) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index e074ce6ebff0b..d1e85bcedc929 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -53,9 +53,12 @@ private[spark] class ResultTask[T, U]( override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. + val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + context.taskMetrics() + .incExecutorDeserializeTime(System.currentTimeMillis() - deserializeStartTime) metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 6c7d00069acb2..b821c31f35baa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -56,9 +56,12 @@ private[spark] class ShuffleMapTask( override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. + val deserializeStartTime = System.currentTimeMillis() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + context.taskMetrics() + .incExecutorDeserializeTime(System.currentTimeMillis() - deserializeStartTime) metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 474f79fb756f6..cc4d7e4009c95 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -657,7 +657,7 @@ private[spark] object JsonProtocol { } val metrics = new TaskMetrics metrics.setHostname((json \ "Host Name").extract[String]) - metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) + metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) metrics.setResultSize((json \ "Result Size").extract[Long]) metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long]) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a2be724254d7c..4e3ef7260437f 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -688,7 +688,7 @@ class JsonProtocolSuite extends FunSuite { hasRecords: Boolean = true) = { val t = new TaskMetrics t.setHostname("localhost") - t.setExecutorDeserializeTime(a) + t.incExecutorDeserializeTime(a) t.setExecutorRunTime(b) t.setResultSize(c) t.setJvmGCTime(d) From 21f5b47adb27b82ab609edc7e9e3432947fec28a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Apr 2015 11:29:05 -0700 Subject: [PATCH 2/7] Don't double-count the broadcast deserialization time in task runtime --- .../scala/org/apache/spark/executor/Executor.scala | 10 +++------- .../org/apache/spark/executor/TaskMetrics.scala | 2 +- .../org/apache/spark/scheduler/ResultTask.scala | 7 ++++++- .../apache/spark/scheduler/ShuffleMapTask.scala | 14 ++++++++++---- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 208cc203fa75a..50b13cdac6d8e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -184,13 +184,13 @@ private[spark] class Executor( val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) - var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) + val deserializeEndTime = System.currentTimeMillis() // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. @@ -205,10 +205,8 @@ private[spark] class Executor( logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) - // Run the actual task and measure its runtime. - taskStart = System.currentTimeMillis() + // Run the actual task (its runtime is measured internally and stored in task metrics) val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber) - val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. if (task.killed) { @@ -221,8 +219,7 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.incExecutorDeserializeTime(taskStart - deserializeStartTime) - m.setExecutorRunTime(taskFinish - taskStart) + m.incExecutorDeserializeTime(deserializeEndTime - deserializeStartTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) } @@ -275,7 +272,6 @@ private[spark] class Executor( val metrics: Option[TaskMetrics] = Option(task).flatMap { task => task.metrics.map { m => - m.setExecutorRunTime(System.currentTimeMillis() - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 7fc106947b5cf..68c546a8e457a 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -47,7 +47,7 @@ class TaskMetrics extends Serializable { /** * Time taken on the executor to deserialize this task */ - private var _executorDeserializeTime: Long = 0 + private var _executorDeserializeTime: Long = _ def executorDeserializeTime: Long = _executorDeserializeTime private[spark] def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index d1e85bcedc929..3b86502d19ab8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -61,7 +61,12 @@ private[spark] class ResultTask[T, U]( .incExecutorDeserializeTime(System.currentTimeMillis() - deserializeStartTime) metrics = Some(context.taskMetrics) - func(context, rdd.iterator(partition, context)) + val startTime = System.currentTimeMillis() + try { + func(context, rdd.iterator(partition, context)) + } finally { + context.taskMetrics().setExecutorRunTime(System.currentTimeMillis() - startTime) + } } // This is only callable on the driver side. diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index b821c31f35baa..854c79f7d8d40 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -66,10 +66,16 @@ private[spark] class ShuffleMapTask( metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { - val manager = SparkEnv.get.shuffleManager - writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) - writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) - return writer.stop(success = true).get + val startTime = System.currentTimeMillis() + try { + val manager = SparkEnv.get.shuffleManager + writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) + writer.write( + rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) + writer.stop(success = true).get + } finally { + context.taskMetrics().setExecutorRunTime(System.currentTimeMillis() - startTime) + } } catch { case e: Exception => try { From 9f32e55eb74c532b555870b40f440c11c5a12a32 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Apr 2015 11:58:17 -0700 Subject: [PATCH 3/7] Expose executorDeserializeTime on Task instead of pushing runtime calculation into Task. --- .../org/apache/spark/executor/Executor.scala | 10 ++++++++-- .../org/apache/spark/executor/TaskMetrics.scala | 2 +- .../org/apache/spark/scheduler/ResultTask.scala | 10 ++-------- .../apache/spark/scheduler/ShuffleMapTask.scala | 17 +++++------------ .../scala/org/apache/spark/scheduler/Task.scala | 7 +++++++ .../org/apache/spark/util/JsonProtocol.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- 7 files changed, 25 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 50b13cdac6d8e..2c104aa89496d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -184,6 +184,7 @@ private[spark] class Executor( val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) + var taskStart: Long = 0 startGCTime = computeTotalGcTime() try { @@ -205,8 +206,10 @@ private[spark] class Executor( logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) - // Run the actual task (its runtime is measured internally and stored in task metrics) + // Run the actual task and measure its runtime. + taskStart = System.currentTimeMillis() val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber) + val taskFinish = System.currentTimeMillis() // If the task has been killed, let's fail it. if (task.killed) { @@ -219,7 +222,9 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { - m.incExecutorDeserializeTime(deserializeEndTime - deserializeStartTime) + m.setExecutorDeserializeTime( + (taskStart - deserializeStartTime) + task.executorDeserializeTime) + m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) } @@ -272,6 +277,7 @@ private[spark] class Executor( val metrics: Option[TaskMetrics] = Option(task).flatMap { task => task.metrics.map { m => + m.setExecutorRunTime(System.currentTimeMillis() - taskStart) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 68c546a8e457a..764b9452851f6 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -49,7 +49,7 @@ class TaskMetrics extends Serializable { */ private var _executorDeserializeTime: Long = _ def executorDeserializeTime: Long = _executorDeserializeTime - private[spark] def incExecutorDeserializeTime(value: Long) = _executorDeserializeTime += value + private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value /** * Time the executor spends actually running the task (including fetching shuffle data) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 3b86502d19ab8..c9a124113961f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -57,16 +57,10 @@ private[spark] class ResultTask[T, U]( val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) - context.taskMetrics() - .incExecutorDeserializeTime(System.currentTimeMillis() - deserializeStartTime) + _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) - val startTime = System.currentTimeMillis() - try { - func(context, rdd.iterator(partition, context)) - } finally { - context.taskMetrics().setExecutorRunTime(System.currentTimeMillis() - startTime) - } + func(context, rdd.iterator(partition, context)) } // This is only callable on the driver side. diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 854c79f7d8d40..bd3dd23dfe1ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -60,22 +60,15 @@ private[spark] class ShuffleMapTask( val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) - context.taskMetrics() - .incExecutorDeserializeTime(System.currentTimeMillis() - deserializeStartTime) + _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { - val startTime = System.currentTimeMillis() - try { - val manager = SparkEnv.get.shuffleManager - writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) - writer.write( - rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) - writer.stop(success = true).get - } finally { - context.taskMetrics().setExecutorRunTime(System.currentTimeMillis() - startTime) - } + val manager = SparkEnv.get.shuffleManager + writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) + writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) + return writer.stop(success = true).get } catch { case e: Exception => try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 8b592867ee31d..77c1908cc1ff0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -87,11 +87,18 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex // initialized when kill() is invoked. @volatile @transient private var _killed = false + protected var _executorDeserializeTime: Long = 0 + /** * Whether the task has been killed. */ def killed: Boolean = _killed + /** + * Returns the amount of time spent on task / RDD deserialization. + */ + def executorDeserializeTime: Long = _executorDeserializeTime + /** * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark * code and user code to properly handle the flag. This function should be idempotent so it can diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index cc4d7e4009c95..474f79fb756f6 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -657,7 +657,7 @@ private[spark] object JsonProtocol { } val metrics = new TaskMetrics metrics.setHostname((json \ "Host Name").extract[String]) - metrics.incExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) + metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long]) metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long]) metrics.setResultSize((json \ "Result Size").extract[Long]) metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long]) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 4e3ef7260437f..a2be724254d7c 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -688,7 +688,7 @@ class JsonProtocolSuite extends FunSuite { hasRecords: Boolean = true) = { val t = new TaskMetrics t.setHostname("localhost") - t.incExecutorDeserializeTime(a) + t.setExecutorDeserializeTime(a) t.setExecutorRunTime(b) t.setResultSize(c) t.setJvmGCTime(d) From e9cf9f478c7ff2177aa7d0192bf00414a4ff1b29 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Apr 2015 12:08:29 -0700 Subject: [PATCH 4/7] Remove unused variable --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2c104aa89496d..d1313b3b29730 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -191,7 +191,6 @@ private[spark] class Executor( val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) - val deserializeEndTime = System.currentTimeMillis() // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. From 4f52910d96c16bc0e4df1313369ad688bad2d266 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Apr 2015 12:08:53 -0700 Subject: [PATCH 5/7] Roll back whitespace change --- core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 764b9452851f6..06152f16ae618 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -51,6 +51,7 @@ class TaskMetrics extends Serializable { def executorDeserializeTime: Long = _executorDeserializeTime private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value + /** * Time the executor spends actually running the task (including fetching shuffle data) */ From a3743b47f4b407f88724e02886fa80c9fcc42f1e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Apr 2015 11:20:53 -0700 Subject: [PATCH 6/7] Update comments. --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +++ core/src/main/scala/org/apache/spark/scheduler/Task.scala | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d1313b3b29730..cc12af047ea24 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -221,8 +221,11 @@ private[spark] class Executor( val afterSerialization = System.currentTimeMillis() for (m <- task.metrics) { + // Deserialization happens in two parts: first, we deserialize a Task object, which + // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. m.setExecutorDeserializeTime( (taskStart - deserializeStartTime) + task.executorDeserializeTime) + // We need to subtract Task.run()'s deserialization time to avoid double-counting m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime) m.setJvmGCTime(computeTotalGcTime() - startGCTime) m.setResultSerializationTime(afterSerialization - beforeSerialization) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 77c1908cc1ff0..b09b19e2ac9e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -95,7 +95,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex def killed: Boolean = _killed /** - * Returns the amount of time spent on task / RDD deserialization. + * Returns the amount of time spent deserializing the RDD and function to be run. */ def executorDeserializeTime: Long = _executorDeserializeTime From ed90f75b7514e43c514ad39e84bb14e06be89d96 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 23 Apr 2015 11:22:22 -0700 Subject: [PATCH 7/7] Update UI tooltip --- core/src/main/scala/org/apache/spark/ui/ToolTips.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index cae6870c2ab20..24f3236456248 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -24,7 +24,9 @@ private[spark] object ToolTips { scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.""" - val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor." + val TASK_DESERIALIZATION_TIME = + """Time spent deserializing the task closure on the executor, including the time to read the + broadcasted task.""" val SHUFFLE_READ_BLOCKED_TIME = "Time that the task spent blocked waiting for shuffle data to be read from remote machines."