diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f771dbf1c0133..dc83ac7ae639f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -276,7 +276,12 @@ class SparkContext(config: SparkConf) extends Logging { conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { - SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) + SparkEnv.createDriverEnv( + conf, + isLocal, + listenerBus, + SparkContext.numDriverCores(master, conf), + this) } private[spark] def env: SparkEnv = _env diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 66ee959dbd831..2f9152d31a548 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -169,6 +169,7 @@ object SparkEnv extends Logging { isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, + sparkContext: SparkContext, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains(DRIVER_HOST_ADDRESS), s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!") @@ -191,6 +192,7 @@ object SparkEnv extends Logging { numCores, ioEncryptionKey, listenerBus = listenerBus, + Option(sparkContext), mockOutputCommitCoordinator = mockOutputCommitCoordinator ) } @@ -235,6 +237,7 @@ object SparkEnv extends Logging { /** * Helper method to create a SparkEnv for a driver or an executor. */ + // scalastyle:off argcount private def create( conf: SparkConf, executorId: String, @@ -245,7 +248,9 @@ object SparkEnv extends Logging { numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, + sc: Option[SparkContext] = None, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { + // scalastyle:on argcount val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER @@ -391,7 +396,12 @@ object SparkEnv extends Logging { } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { - new OutputCommitCoordinator(conf, isDriver) + if (isDriver) { + new OutputCommitCoordinator(conf, isDriver, sc) + } else { + new OutputCommitCoordinator(conf, isDriver) + } + } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 289296f6fdb1b..9bd4a6f4478b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1175,6 +1175,13 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) } + private[scheduler] def handleStageFailed( + stageId: Int, + reason: String, + exception: Option[Throwable]): Unit = { + stageIdToStage.get(stageId).foreach { abortStage(_, reason, exception) } + } + private[scheduler] def handleTaskSetFailed( taskSet: TaskSet, reason: String, @@ -2608,6 +2615,13 @@ private[spark] class DAGScheduler( runningStages -= stage } + /** + * Called by the OutputCommitCoordinator to cancel stage due to data duplication may happen. + */ + private[scheduler] def stageFailed(stageId: Int, reason: String): Unit = { + eventProcessLoop.post(StageFailed(stageId, reason, None)) + } + /** * Aborts all jobs depending on a particular Stage. This is called in response to a task set * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. @@ -2876,6 +2890,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) + case StageFailed(stageId, reason, exception) => + dagScheduler.handleStageFailed(stageId, reason, exception) + case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index f3798da5aa1d8..f9df8de620ff8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -88,6 +88,10 @@ private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossR private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String) extends DAGSchedulerEvent +private[scheduler] +case class StageFailed(stageId: Int, reason: String, exception: Option[Throwable]) + extends DAGSchedulerEvent + private[scheduler] case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable]) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index a5858ebf9cdcc..a33c2bb93bcab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -44,7 +44,10 @@ private case class AskPermissionToCommitOutput( * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) * for an extensive design discussion. */ -private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging { +private[spark] class OutputCommitCoordinator( + conf: SparkConf, + isDriver: Boolean, + sc: Option[SparkContext] = None) extends Logging { // Initialized by SparkEnv var coordinatorRef: Option[RpcEndpointRef] = None @@ -155,9 +158,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) val taskId = TaskIdentifier(stageAttempt, attemptNumber) stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId if (stageState.authorizedCommitters(partition) == taskId) { - logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " + - s"partition=$partition) failed; clearing lock") - stageState.authorizedCommitters(partition) = null + sc.foreach(_.dagScheduler.stageFailed(stage, s"Authorized committer " + + s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " + + s"but task commit success, data duplication may happen.")) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index 7d063c3b3ac53..66b13be4f7a5e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.scheduler import org.apache.hadoop.mapred.{FileOutputCommitter, TaskAttemptContext} import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} -import org.scalatest.time.{Seconds, Span} -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TaskContext} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, TaskContext} /** * Integration tests for the OutputCommitCoordinator. @@ -45,13 +44,14 @@ class OutputCommitCoordinatorIntegrationSuite sc = new SparkContext("local[2, 4]", "test", conf) } - test("exception thrown in OutputCommitter.commitTask()") { + test("SPARK-39195: exception thrown in OutputCommitter.commitTask()") { // Regression test for SPARK-10381 - failAfter(Span(60, Seconds)) { + val e = intercept[SparkException] { withTempDir { tempDir => sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out") } - } + }.getCause.getMessage + assert(e.endsWith("failed; but task commit success, data duplication may happen.")) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 360f7e1b4e441..62c559b52ffdd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -86,11 +86,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { - outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true)) + outputCommitCoordinator = + spy(new OutputCommitCoordinator(conf, isDriver = true, Option(this))) // Use Mockito.spy() to maintain the default infrastructure everywhere else. // This mocking allows us to control the coordinator responses in test cases. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, - SparkContext.numDriverCores(master), Some(outputCommitCoordinator)) + SparkContext.numDriverCores(master), this, Some(outputCommitCoordinator)) } } // Use Mockito.spy() to maintain the default infrastructure everywhere else @@ -187,12 +188,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { // The authorized committer now fails, clearing the lock outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition, attemptNumber = authorizedCommitter, reason = TaskKilled("test")) - // A new task should now be allowed to become the authorized committer - assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition, - nonAuthorizedCommitter + 2)) - // There can only be one authorized committer + // A new task should not be allowed to become stage failed because of potential data duplication assert(!outputCommitCoordinator.canCommit(stage, stageAttempt, partition, - nonAuthorizedCommitter + 3)) + nonAuthorizedCommitter + 2)) } test("SPARK-19631: Do not allow failed attempts to be authorized for committing") { @@ -226,7 +224,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(outputCommitCoordinator.canCommit(stage, 2, partition, taskAttempt)) // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit, - // then fail the 1st attempt and make sure the 4th one can commit again. + // then fail the 1st attempt and since stage failed because of potential data duplication, + // make sure fail the 4th attempt. stage += 1 outputCommitCoordinator.stageStart(stage, maxPartitionId = 1) assert(outputCommitCoordinator.canCommit(stage, 1, partition, taskAttempt)) @@ -235,7 +234,9 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt)) outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt, ExecutorLostFailure("0", exitCausedByApp = true, None)) - assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) + // A new task should not be allowed to become the authorized committer since stage failed + // because of potential data duplication + assert(!outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt)) } test("SPARK-24589: Make sure stage state is cleaned up") {