Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
m.setExecutorDeserializeTime(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It would be super helpful to have a comment here to say something to describe the two components of this, like "Deserialization happens in two parts: first, we deserialize a Task object, which includes the Partition. Second, Task.run() deserializes the RDD and function to be run."

(taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
m.setJvmGCTime(computeTotalGcTime() - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](

override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,18 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
// initialized when kill() is invoked.
@volatile @transient private var _killed = false

protected var _executorDeserializeTime: Long = 0

/**
* Whether the task has been killed.
*/
def killed: Boolean = _killed

/**
* Returns the amount of time spent deserializing the RDD and function to be run.
*/
def executorDeserializeTime: Long = _executorDeserializeTime

/**
* Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
* code and user code to properly handle the flag. This function should be idempotent so it can
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/ToolTips.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ private[spark] object ToolTips {
scheduler delay is large, consider decreasing the size of tasks or decreasing the size
of task results."""

val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
val TASK_DESERIALIZATION_TIME =
"""Time spent deserializing the task closure on the executor, including the time to read the
broadcasted task."""

val SHUFFLE_READ_BLOCKED_TIME =
"Time that the task spent blocked waiting for shuffle data to be read from remote machines."
Expand Down