OutputCommitCoordinator
service is authority that coordinates result commits by means of commit locks (using the internal authorizedCommittersByStage registry).
Result commits are the outputs of running tasks (and a running task is described by a task attempt for a partition in a stage).
Tip
|
A partition (of a stage) is unlocked when it is marked as -1 in authorizedCommittersByStage internal registry.
|
From the scaladoc (it’s a private[spark]
class so no way to find it outside the code):
Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy. OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver’s OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver’s OutputCommitCoordinator.
The most interesting piece is in…
This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) for an extensive design discussion.
OutputCommitCoordinator
Internal Registries and Counters
Name | Description |
---|---|
Tracks commit locks for task attempts for a partition in a stage. Used in taskCompleted to authorize task completions to…FIXME |
Tip
|
Enable Add the following line to
Refer to Logging. |
taskCompleted(
stage: StageId,
partition: PartitionId,
attemptNumber: TaskAttemptNumber,
reason: TaskEndReason): Unit
taskCompleted
marks the partition
(in the stage
) completed (and hence a result committed), but only when the attemptNumber
is amongst authorized committers per stage (for the partition
).
Internally, taskCompleted
first finds authorized committers for the stage
.
For task completions with no stage registered in authorizedCommittersByStage
internal registry, you should see the following DEBUG message in the logs and taskCompleted
simply exits.
DEBUG OutputCommitCoordinator: Ignoring task completion for completed stage
For the reason
being Success
taskCompleted
does nothing and exits.
For the reason
being TaskCommitDenied
, you should see the following INFO message in the logs and taskCompleted
exits.
INFO OutputCommitCoordinator: Task was denied committing, stage: [stage], partition: [partition], attempt: [attemptNumber]
Note
|
For no stage registered or reason being Success or TaskCommitDenied , taskCompleted does nothing (important).
|
For task completion reasons other than Success
or TaskCommitDenied
and attemptNumber
amongst authorized committers, taskCompleted
marks partition
unlocked.
Note
|
A task attempt can never be -1 .
|
When the lock for partition
is cleared, You should see the following DEBUG message in the logs:
DEBUG OutputCommitCoordinator: Authorized committer (attemptNumber=[attemptNumber], stage=[stage], partition=[partition]) failed; clearing lock
Note
|
taskCompleted is executed only when DAGScheduler informs that a task has completed.
|