TaskRunner
is a thread of execution that manages a single individual task.
TaskRunner
is created exclusively when Executor
is requested to launch a task.
TaskRunner
can be run or killed that simply means running or killing the task this TaskRunner
object manages, respectively.
Name | Description |
---|---|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
|
FIXME Used when…FIXME |
Tip
|
Enable Add the following line to
Refer to Logging. |
TaskRunner
takes the following when created:
TaskRunner
initializes the internal registries and counters.
Caution
|
FIXME Image with state changes |
A TaskRunner
object is created when an executor is requested to launch a task.
It is created with an ExecutorBackend (to send the task’s status updates to), task and attempt ids, task name, and serialized version of the task (as ByteBuffer
).
Note
|
run is part of java.lang.Runnable contract that TaskRunner follows.
|
run
then sets the name of the current thread as threadName (using Java’s Thread).
run
creates a TaskMemoryManager
(using the current MemoryManager and taskId).
Note
|
run uses SparkEnv to access the current MemoryManager .
|
run
starts tracking the time to deserialize a task.
run
sets the current thread’s context classloader (with replClassLoader).
Note
|
run uses SparkEnv to access the current closure Serializer .
|
You should see the following INFO message in the logs:
INFO Executor: Running [taskName] (TID [taskId])
run
notifies ExecutorBackend
that taskId is in TaskState.RUNNING
state.
Note
|
run uses ExecutorBackend that was specified when TaskRunner was created.
|
run
computes startGCTime
.
run
updates dependencies.
Note
|
run uses TaskDescription that is specified when TaskRunner is created.
|
run
deserializes the task (using the context class loader) and sets its localProperties
and TaskMemoryManager
. run
sets the task internal reference to hold the deserialized task.
Note
|
run uses TaskDescription to access serialized task.
|
If killed flag is enabled, run
throws a TaskKilledException
.
You should see the following DEBUG message in the logs:
DEBUG Executor: Task [taskId]'s epoch is [task.epoch]
Note
|
run uses SparkEnv to access the current MapOutputTracker .
|
run
records the current time as the task’s start time (as taskStart
).
run
runs the task (with taskAttemptId
as taskId, attemptNumber
from TaskDescription
, and metricsSystem
as the current MetricsSystem).
Note
|
run uses SparkEnv to access the current MetricsSystem .
|
Note
|
The task runs inside a "monitored" block (i.e. try-finally block) to detect any memory and lock leaks after the task’s run finishes regardless of the final outcome - the computed value or an exception thrown.
|
After the task’s run has finished (inside the "finally" block of the "monitored" block), run
requests BlockManager
to release all locks of the task (for the task’s taskId). The locks are later used for lock leak detection.
run
then requests TaskMemoryManager
to clean up allocated memory (that helps finding memory leaks).
If run
detects memory leak of the managed memory (i.e. the memory freed is greater than 0
) and spark.unsafe.exceptionOnMemoryLeak Spark property is enabled (it is not by default) and no exception was reported while the task ran, run
reports a SparkException
:
Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
Otherwise, if spark.unsafe.exceptionOnMemoryLeak is disabled, you should see the following ERROR message in the logs instead:
ERROR Executor: Managed memory leak detected; size = [freedMemory] bytes, TID = [taskId]
Note
|
If run detects a memory leak, it leads to a SparkException or ERROR message in the logs.
|
If run
detects lock leaking (i.e. the number of locks released) and spark.storage.exceptionOnPinLeak Spark property is enabled (it is not by default) and no exception was reported while the task ran, run
reports a SparkException
:
[releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
Otherwise, if spark.storage.exceptionOnPinLeak is disabled or the task reported an exception, you should see the following INFO message in the logs instead:
INFO Executor: [releasedLocks] block locks were not released by TID = [taskId]:
[releasedLocks separated by comma]
Note
|
If run detects any lock leak, it leads to a SparkException or INFO message in the logs.
|
Rigth after the "monitored" block, run
records the current time as the task’s finish time (as taskFinish
).
If the task was killed (while it was running), run
reports a TaskKilledException
(and the TaskRunner
exits).
run
creates a Serializer
and serializes the task’s result. run
measures the time to serialize the result.
Note
|
run uses SparkEnv to access the current Serializer . SparkEnv was specified when the owning Executor was created.
|
Important
|
This is when TaskExecutor serializes the computed value of a task to be sent back to the driver.
|
run
records the task metrics:
run
creates a DirectTaskResult (with the serialized result and the latest values of accumulators).
run
serializes the DirectTaskResult
and gets the byte buffer’s limit.
Note
|
A serialized DirectTaskResult is Java’s java.nio.ByteBuffer.
|
run
selects the proper serialized version of the result before sending it to ExecutorBackend
.
run
branches off based on the serialized DirectTaskResult
byte buffer’s limit.
When maxResultSize is greater than 0
and the serialized DirectTaskResult
buffer limit exceeds it, the following WARN message is displayed in the logs:
WARN Executor: Finished [taskName] (TID [taskId]). Result is larger than maxResultSize ([resultSize] > [maxResultSize]), dropping it.
Tip
|
Read about spark.driver.maxResultSize. |
$ ./bin/spark-shell -c spark.driver.maxResultSize=1m
scala> sc.version
res0: String = 2.0.0-SNAPSHOT
scala> sc.getConf.get("spark.driver.maxResultSize")
res1: String = 1m
scala> sc.range(0, 1024 * 1024 + 10, 1).collect
WARN Executor: Finished task 4.0 in stage 0.0 (TID 4). Result is larger than maxResultSize (1031.4 KB > 1024.0 KB), dropping it.
...
ERROR TaskSetManager: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
...
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1 tasks (1031.4 KB) is bigger than spark.driver.maxResultSize (1024.0 KB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1448)
...
In this case, run
creates a IndirectTaskResult (with a TaskResultBlockId
for the task’s taskId and resultSize
) and serializes it.
When maxResultSize
is not positive or resultSize
is smaller than maxResultSize
but greater than maxDirectResultSize, run
creates a TaskResultBlockId
for the task’s taskId and stores the serialized DirectTaskResult
in BlockManager
(as the TaskResultBlockId
with MEMORY_AND_DISK_SER
storage level).
You should see the following INFO message in the logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent via BlockManager)
In this case, run
creates a IndirectTaskResult (with a TaskResultBlockId
for the task’s taskId and resultSize
) and serializes it.
Note
|
The difference between the two above cases is that the result is dropped or stored in BlockManager with MEMORY_AND_DISK_SER storage level.
|
When the two cases above do not hold, you should see the following INFO message in the logs:
INFO Executor: Finished [taskName] (TID [taskId]). [resultSize] bytes result sent to driver
run
uses the serialized DirectTaskResult
byte buffer as the final serializedResult
.
Note
|
The final serializedResult is either a IndirectTaskResult (possibly with the block stored in BlockManager ) or a DirectTaskResult.
|
run
notifies ExecutorBackend
that taskId is in TaskState.FINISHED
state with the serialized result and removes taskId from the owning executor’s runningTasks registry.
Note
|
run uses ExecutorBackend that is specified when TaskRunner is created.
|
Note
|
TaskRunner is Java’s Runnable and the contract requires that once a TaskRunner has completed execution it must not be restarted.
|
When run
catches a exception while executing the task, run
acts according to its type (as presented in the following "run’s Exception Cases" table and the following sections linked from the table).
Exception Type | TaskState | Serialized ByteBuffer |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
When FetchFailedException is reported while running a task, run
setTaskFinishedAndClearInterruptStatus.
run
requests FetchFailedException
for the TaskFailedReason
, serializes it and notifies ExecutorBackend
that the task has failed (with taskId, TaskState.FAILED
, and a serialized reason).
Note
|
ExecutorBackend was specified when TaskRunner was created.
|
Note
|
run uses a closure Serializer to serialize the failure reason. The Serializer was created before run ran the task.
|
When TaskKilledException
is reported while running a task, you should see the following INFO message in the logs:
INFO Executor killed [taskName] (TID [taskId])
run
then setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend
that the task has been killed (with taskId, TaskState.KILLED
, and a serialized TaskKilled
object).
When InterruptedException
is reported while running a task, and the task has been killed, you should see the following INFO message in the logs:
INFO Executor interrupted and killed [taskName] (TID [taskId])
run
then setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend
that the task has been killed (with taskId, TaskState.KILLED
, and a serialized TaskKilled
object).
Note
|
The difference between this InterruptedException and TaskKilledException is the INFO message in the logs.
|
When CommitDeniedException
is reported while running a task, run
setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend
that the task has failed (with taskId, TaskState.FAILED
, and a serialized TaskKilled
object).
Note
|
The difference between this CommitDeniedException and FetchFailedException is just the reason being sent to ExecutorBackend .
|
When run
catches a Throwable
, you should see the following ERROR message in the logs (followed by the exception).
ERROR Exception in [taskName] (TID [taskId])
run
then records the following task metrics (only when Task is available):
run
then collects the latest values of internal and external accumulators (with taskFailed
flag enabled to inform that the collection is for a failed task).
Otherwise, when Task is not available, the accumulator collection is empty.
run
converts the task accumulators to collection of AccumulableInfo
, creates a ExceptionFailure
(with the accumulators), and serializes them.
Note
|
run uses a closure Serializer to serialize the ExceptionFailure .
|
Caution
|
FIXME Why does run create new ExceptionFailure(t, accUpdates).withAccums(accums) , i.e. accumulators occur twice in the object.
|
run
setTaskFinishedAndClearInterruptStatus and notifies ExecutorBackend
that the task has failed (with taskId, TaskState.FAILED
, and the serialized ExceptionFailure
).
run
may also trigger SparkUncaughtExceptionHandler.uncaughtException(t)
if this is a fatal error.
Note
|
The difference between this most Throwable case and other FAILED cases (i.e. FetchFailedException and CommitDeniedException) is just the serialized ExceptionFailure vs a reason being sent to ExecutorBackend , respectively.
|
kill(interruptThread: Boolean): Unit
kill
marks the TaskRunner
as killed and kills the task (if available and not finished already).
Note
|
kill passes the input interruptThread on to the task itself while killing it.
|
When executed, you should see the following INFO message in the logs:
INFO TaskRunner: Executor is trying to kill [taskName] (TID [taskId])