diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index c3e1cd8b23f1..c70e0a36a99a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -175,6 +175,10 @@ private[spark] class DAGScheduler( // TODO: Garbage collect information about failure epochs when we know there are no more // stray messages to detect. private val failedEpoch = new HashMap[String, Long] + // There will be a regression when an executor lost and then causes 'fetch failed'. + // So we should distinguish the failedEpoch of 'executor lost' from the fetchFailedEpoch + // of 'fetch failed'. + private val fetchFailedEpoch = new HashMap[String, Long] private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator @@ -1682,6 +1686,7 @@ private[spark] class DAGScheduler( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, + fetchFailed = true, maybeEpoch = Some(task.epoch)) } } @@ -1838,12 +1843,21 @@ private[spark] class DAGScheduler( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], + fetchFailed: Boolean = false, maybeEpoch: Option[Long] = None): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) - if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) { + val executorNewlyFailed = !failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch + val executorNewlyFetchFailed = + !fetchFailedEpoch.contains(execId) || fetchFailedEpoch(execId) < currentEpoch + if (executorNewlyFailed || executorNewlyFetchFailed) { + if (executorNewlyFailed) { + blockManagerMaster.removeExecutor(execId) + } + if (fetchFailed) { + fetchFailedEpoch(execId) = currentEpoch + } failedEpoch(execId) = currentEpoch logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch)) - blockManagerMaster.removeExecutor(execId) if (fileLost) { hostToUnregisterOutputs match { case Some(host) => @@ -1887,6 +1901,11 @@ private[spark] class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } + // remove from fetchFailedEpoch(execId) + if (fetchFailedEpoch.contains(execId)) { + logInfo("Host added was in lost list earlier: " + host) + fetchFailedEpoch -= execId + } } private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index c27d50ab66e6..67c576496a52 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -537,6 +537,167 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(mapStatus2(2).location.host === "hostB") } + test("[SPARK-29551] All shuffle files on the executor should be cleaned up when " + + "executor lost and then causes 'fetch failed'") { + // whether to kill Executor or not before FetchFailed + Seq(true, false).foreach { killExecutor => + afterEach() + val conf = new SparkConf() + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + conf.set(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key, "false") + init(conf) + runEvent(ExecutorAdded("exec-hostA1", "hostA")) + runEvent(ExecutorAdded("exec-hostA2", "hostA")) + runEvent(ExecutorAdded("exec-hostB", "hostB")) + val firstRDD = new MyRDD(sc, 3, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val secondShuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + + submit(reduceRdd, Array(0)) + // map stage1 completes successfully, with one task on each executor + complete(taskSets(0), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 5)), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 6)), + (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) + )) + // map stage2 completes successfully, with one task on each executor + complete(taskSets(1), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 8)), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 9)), + (Success, makeMapStatus("hostB", 1)) + )) + // make sure our test setup is correct + val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses + // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get + assert(initialMapStatus1.count(_ != null) === 3) + assert(initialMapStatus1.map { + _.location.executorId + }.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses + assert(initialMapStatus2.count(_ != null) === 3) + assert(initialMapStatus2.map { + _.location.executorId + }.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + // kill exec-hostA2 + if (killExecutor) { + runEvent(ExecutorLost("exec-hostA2", ExecutorKilled)) + } + // reduce stage fails with a fetch failure from one host + complete(taskSets(2), Seq( + (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), + secondShuffleId, 0L, 0, 0, "ignored"), null) + )) + // Here is the main assertion -- make sure that we de-register + // the map outputs for exec-hostA2 + val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses + assert(mapStatus1.count(_ != null) === 2) + assert(mapStatus1(0).location.executorId === "exec-hostA1") + assert(mapStatus1(0).location.host === "hostA") + assert(mapStatus1(2).location.executorId === "exec-hostB") + assert(mapStatus1(2).location.host === "hostB") + + val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses + assert(mapStatus2.count(_ != null) === 2) + assert(mapStatus2(0).location.executorId === "exec-hostA1") + assert(mapStatus2(0).location.host === "hostA") + assert(mapStatus2(2).location.executorId === "exec-hostB") + assert(mapStatus2(2).location.host === "hostB") + } + } + + test("[SPARK-29551] All shuffle files on the host should be cleaned up when host lost") { + // whether to kill Executor or not before FetchFailed + Seq(true, false).foreach { killExecutor => + afterEach() + val conf = new SparkConf() + conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + conf.set(config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key, "true") + init(conf) + runEvent(ExecutorAdded("exec-hostA1", "hostA")) + runEvent(ExecutorAdded("exec-hostA2", "hostA")) + runEvent(ExecutorAdded("exec-hostB", "hostB")) + val firstRDD = new MyRDD(sc, 3, Nil) + val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) + val firstShuffleId = firstShuffleDep.shuffleId + val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) + val secondShuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + + submit(reduceRdd, Array(0)) + // map stage1 completes successfully, with one task on each executor + complete(taskSets(0), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 5)), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 6)), + (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) + )) + // map stage2 completes successfully, with one task on each executor + complete(taskSets(1), Seq( + (Success, + MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 8)), + (Success, + MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), + Array.fill[Long](1)(2), mapTaskId = 9)), + (Success, makeMapStatus("hostB", 1)) + )) + // make sure our test setup is correct + val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses + assert(initialMapStatus1.count(_ != null) === 3) + assert(initialMapStatus1.map { + _.location.executorId + }.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + + val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses + assert(initialMapStatus2.count(_ != null) === 3) + assert(initialMapStatus2.map { + _.location.executorId + }.toSet === + Set("exec-hostA1", "exec-hostA2", "exec-hostB")) + // kill exec-hostA2 + if (killExecutor) { + runEvent(ExecutorLost("exec-hostA2", ExecutorKilled)) + } + // reduce stage fails with a fetch failure from one host + complete(taskSets(2), Seq( + (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), + secondShuffleId, 0L, 0, 0, "ignored"), + null) + )) + + // Here is the main assertion -- make sure that we de-register + // the map outputs for both map stage from both executors on hostA + val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses + assert(mapStatus1.count(_ != null) === 1) + assert(mapStatus1(2).location.executorId === "exec-hostB") + assert(mapStatus1(2).location.host === "hostB") + + val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses + assert(mapStatus2.count(_ != null) === 1) + assert(mapStatus2(2).location.executorId === "exec-hostB") + assert(mapStatus2(2).location.host === "hostB") + } + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None