From ce17e02fbebbac6a3e4c92e5a9ec8b2a59879f20 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Thu, 16 Feb 2017 15:03:35 +0000 Subject: [PATCH 1/7] Record failed attempts in the OutputCommitCoordinator --- .../scheduler/OutputCommitCoordinator.scala | 56 +++++++++++++------ .../OutputCommitCoordinatorSuite.scala | 11 ++++ 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 08d220b40b6f..951cddf7f755 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -48,25 +48,28 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private type StageId = Int private type PartitionId = Int private type TaskAttemptNumber = Int + private case class StageState(authorizedCommitters: Array[TaskAttemptNumber], + failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]) private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 /** - * Map from active stages's id => partition id => task attempt with exclusive lock on committing - * output for that partition. + * Map from active stages's id => authorized task attempts for each partition id, which hold an + * exclusive lock on committing task output for that partition as well as any known failed + * attempts in the stage. * * Entries are added to the top-level map when stages start and are removed they finish * (either successfully or unsuccessfully). * * Access to this map should be guarded by synchronizing on the OutputCommitCoordinator instance. */ - private val authorizedCommittersByStage = mutable.Map[StageId, Array[TaskAttemptNumber]]() + private val stageStates = mutable.Map[StageId, StageState]() /** * Returns whether the OutputCommitCoordinator's internal data structures are all empty. */ def isEmpty: Boolean = { - authorizedCommittersByStage.isEmpty + stageStates.isEmpty } /** @@ -110,14 +113,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) maxPartitionId: Int): Unit = { val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) + val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() synchronized { - authorizedCommittersByStage(stage) = arr + stageStates(stage) = new StageState(arr, failures) } } // Called by DAGScheduler private[scheduler] def stageEnd(stage: StageId): Unit = synchronized { - authorizedCommittersByStage.remove(stage) + stageStates.remove(stage) } // Called by DAGScheduler @@ -126,7 +130,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) partition: PartitionId, attemptNumber: TaskAttemptNumber, reason: TaskEndReason): Unit = synchronized { - val authorizedCommitters = authorizedCommittersByStage.getOrElse(stage, { + val stageState = stageStates.getOrElse(stage, { logDebug(s"Ignoring task completion for completed stage") return }) @@ -137,10 +141,15 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) logInfo(s"Task was denied committing, stage: $stage, partition: $partition, " + s"attempt: $attemptNumber") case otherReason => - if (authorizedCommitters(partition) == attemptNumber) { + // Mark the attempt as failed to blacklist from future commit protocol + stageState.failures.get(partition) match { + case Some(failures) => failures += attemptNumber + case None => stageState.failures(partition) = mutable.Set(attemptNumber) + } + if (stageState.authorizedCommitters(partition) == attemptNumber) { logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + s"partition=$partition) failed; clearing lock") - authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER + stageState.authorizedCommitters(partition) = NO_AUTHORIZED_COMMITTER } } } @@ -149,7 +158,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) if (isDriver) { coordinatorRef.foreach(_ send StopCoordinator) coordinatorRef = None - authorizedCommittersByStage.clear() + stageStates.clear() } } @@ -158,13 +167,17 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) stage: StageId, partition: PartitionId, attemptNumber: TaskAttemptNumber): Boolean = synchronized { - authorizedCommittersByStage.get(stage) match { - case Some(authorizedCommitters) => - authorizedCommitters(partition) match { + stageStates.get(stage) match { + case Some(state) if attemptFailed(stage, partition, attemptNumber) => + logWarning(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + + s" partition=$partition as task attempt $attemptNumber has already failed.") + false + case Some(state) => + state.authorizedCommitters(partition) match { case NO_AUTHORIZED_COMMITTER => logDebug(s"Authorizing attemptNumber=$attemptNumber to commit for stage=$stage, " + s"partition=$partition") - authorizedCommitters(partition) = attemptNumber + state.authorizedCommitters(partition) = attemptNumber true case existingCommitter => // Coordinator should be idempotent when receiving AskPermissionToCommit. @@ -181,11 +194,22 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } } case None => - logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" + - s"partition $partition to commit") + logDebug(s"Stage $stage has completed, so not allowing" + + s" attempt number $attemptNumber of partition $partition to commit") false } } + + private def attemptFailed(stage: StageId, + partition: PartitionId, + attempt: TaskAttemptNumber): Boolean = synchronized { + stageStates.get(stage) match { + case Some(state) => + state.failures.get(partition) + .exists(_.contains(attempt)) + case None => false + } + } } private[spark] object OutputCommitCoordinator { diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 0c362b881d91..83ed12752074 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -195,6 +195,17 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _, 0 until rdd.partitions.size) } + + test("SPARK-19631: Do not allow failed attempts to be authorized for committing") { + val stage: Int = 1 + val partition: Int = 1 + val failedAttempt: Int = 0 + outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) + outputCommitCoordinator.taskCompleted(stage, partition, attemptNumber = failedAttempt, + reason = ExecutorLostFailure("0", exitCausedByApp = true, None)) + assert(!outputCommitCoordinator.canCommit(stage, partition, failedAttempt)) + assert(outputCommitCoordinator.canCommit(stage, partition, failedAttempt + 1)) + } } /** From b0ac2a75894216c831192628714b6cf628927460 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Thu, 16 Feb 2017 17:21:53 +0000 Subject: [PATCH 2/7] minor style changes --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 951cddf7f755..327709af02a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -55,7 +55,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) /** * Map from active stages's id => authorized task attempts for each partition id, which hold an - * exclusive lock on committing task output for that partition as well as any known failed + * exclusive lock on committing task output for that partition, as well as any known failed * attempts in the stage. * * Entries are added to the top-level map when stages start and are removed they finish @@ -203,11 +203,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private def attemptFailed(stage: StageId, partition: PartitionId, attempt: TaskAttemptNumber): Boolean = synchronized { - stageStates.get(stage) match { - case Some(state) => - state.failures.get(partition) - .exists(_.contains(attempt)) - case None => false + stageStates.get(stage).exists { state => + state.failures.get(partition).exists(_.contains(attempt)) } } } From ed3ab09f942115c6521a76e2b100eef2f872c008 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Mon, 27 Feb 2017 15:44:32 -0500 Subject: [PATCH 3/7] PR feedback --- .../scheduler/OutputCommitCoordinator.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 327709af02a7..8856cfa6e588 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -48,8 +48,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private type StageId = Int private type PartitionId = Int private type TaskAttemptNumber = Int - private case class StageState(authorizedCommitters: Array[TaskAttemptNumber], - failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]) + private case class StageState( + authorizedCommitters: Array[TaskAttemptNumber], + failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]) private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 @@ -142,10 +143,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) s"attempt: $attemptNumber") case otherReason => // Mark the attempt as failed to blacklist from future commit protocol - stageState.failures.get(partition) match { - case Some(failures) => failures += attemptNumber - case None => stageState.failures(partition) = mutable.Set(attemptNumber) - } + stageState.failures.getOrElseUpdate(partition, mutable.Set()) += attemptNumber if (stageState.authorizedCommitters(partition) == attemptNumber) { logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + s"partition=$partition) failed; clearing lock") @@ -169,7 +167,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) attemptNumber: TaskAttemptNumber): Boolean = synchronized { stageStates.get(stage) match { case Some(state) if attemptFailed(stage, partition, attemptNumber) => - logWarning(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + + logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + s" partition=$partition as task attempt $attemptNumber has already failed.") false case Some(state) => @@ -200,9 +198,10 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } } - private def attemptFailed(stage: StageId, - partition: PartitionId, - attempt: TaskAttemptNumber): Boolean = synchronized { + private def attemptFailed( + stage: StageId, + partition: PartitionId, + attempt: TaskAttemptNumber): Boolean = synchronized { stageStates.get(stage).exists { state => state.failures.get(partition).exists(_.contains(attempt)) } From 20f028ad5e6f746842ca3dd10ea12811a4a699a4 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Mon, 27 Feb 2017 19:58:25 -0500 Subject: [PATCH 4/7] feedback --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 8856cfa6e588..42d276a92085 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -48,9 +48,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private type StageId = Int private type PartitionId = Int private type TaskAttemptNumber = Int - private case class StageState( - authorizedCommitters: Array[TaskAttemptNumber], - failures: mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]) + private case class StageState(authorizedCommitters: Array[TaskAttemptNumber]) { + val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() + } private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 @@ -114,9 +114,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) maxPartitionId: Int): Unit = { val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) - val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() synchronized { - stageStates(stage) = new StageState(arr, failures) + stageStates(stage) = new StageState(arr) } } From 1fcbd5d0413f15332343b933047cabde4b787731 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Mon, 27 Feb 2017 20:34:40 -0500 Subject: [PATCH 5/7] refactor constructor --- .../scheduler/OutputCommitCoordinator.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 42d276a92085..6ad11b46e493 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -48,12 +48,12 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) private type StageId = Int private type PartitionId = Int private type TaskAttemptNumber = Int - private case class StageState(authorizedCommitters: Array[TaskAttemptNumber]) { + private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 + private case class StageState(numPartitions: Int) { + val authorizedCommitters = Array.fill[TaskAttemptNumber](numPartitions)(NO_AUTHORIZED_COMMITTER) val failures = mutable.Map[PartitionId, mutable.Set[TaskAttemptNumber]]() } - private val NO_AUTHORIZED_COMMITTER: TaskAttemptNumber = -1 - /** * Map from active stages's id => authorized task attempts for each partition id, which hold an * exclusive lock on committing task output for that partition, as well as any known failed @@ -109,14 +109,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) * @param maxPartitionId the maximum partition id that could appear in this stage's tasks (i.e. * the maximum possible value of `context.partitionId`). */ - private[scheduler] def stageStart( - stage: StageId, - maxPartitionId: Int): Unit = { - val arr = new Array[TaskAttemptNumber](maxPartitionId + 1) - java.util.Arrays.fill(arr, NO_AUTHORIZED_COMMITTER) - synchronized { - stageStates(stage) = new StageState(arr) - } + private[scheduler] def stageStart(stage: StageId, maxPartitionId: Int): Unit = synchronized { + stageStates(stage) = new StageState(maxPartitionId + 1) } // Called by DAGScheduler From 7d12b7883c1f786655d39ad59b3c7eaea9b1da42 Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Tue, 28 Feb 2017 17:09:53 -0500 Subject: [PATCH 6/7] pass state into helper --- .../apache/spark/scheduler/OutputCommitCoordinator.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 6ad11b46e493..942804a9da65 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -159,7 +159,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) partition: PartitionId, attemptNumber: TaskAttemptNumber): Boolean = synchronized { stageStates.get(stage) match { - case Some(state) if attemptFailed(stage, partition, attemptNumber) => + case Some(state) if attemptFailed(state, partition, attemptNumber) => logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + s" partition=$partition as task attempt $attemptNumber has already failed.") false @@ -192,12 +192,10 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) } private def attemptFailed( - stage: StageId, + stageState: StageState, partition: PartitionId, attempt: TaskAttemptNumber): Boolean = synchronized { - stageStates.get(stage).exists { state => - state.failures.get(partition).exists(_.contains(attempt)) - } + stageState.failures.get(partition).exists(_.contains(attempt)) } } From 9e3c8fe4855d129c47547112a3389899d0b240ce Mon Sep 17 00:00:00 2001 From: Patrick Woody Date: Thu, 2 Mar 2017 10:40:38 -0500 Subject: [PATCH 7/7] style --- .../org/apache/spark/scheduler/OutputCommitCoordinator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 942804a9da65..83d87b548a43 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -161,7 +161,7 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) stageStates.get(stage) match { case Some(state) if attemptFailed(state, partition, attemptNumber) => logInfo(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage," + - s" partition=$partition as task attempt $attemptNumber has already failed.") + s" partition=$partition as task attempt $attemptNumber has already failed.") false case Some(state) => state.authorizedCommitters(partition) match {