diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index f1339d530ad4..ddc59b5721ef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -780,7 +780,8 @@ private[spark] class TaskSetManager( // and we are not using an external shuffle server which could serve the shuffle outputs. // The reason is the next stage wouldn't be able to fetch the data from this dead executor // so we would need to rerun these tasks on other executors. - if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) { + if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled + && !isZombie) { for ((tid, info) <- taskInfos if info.executorId == execId) { val index = taskInfos(tid).index if (successful(index)) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index d35ca411f408..c9b927ce1b98 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -635,6 +635,52 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize")) } + test("taskSetManager should not send Resubmitted tasks after being a zombie") { + // Regression test for SPARK-13931 + val conf = new SparkConf().set("spark.speculation", "true") + sc = new SparkContext("local", "test", conf) + + val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) + + // count for Resubmitted tasks + var resubmittedTasks = 0 + val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, + accumUpdates: Seq[AccumulableInfo], taskInfo: TaskInfo) { + sched.endedTasks(taskInfo.index) = reason + reason match { + case Resubmitted => resubmittedTasks += 1 + case _ => + } + } + } + sched.setDAGScheduler(dagScheduler) + + val tasks = Array.tabulate[Task[_]](1) { i => + new ShuffleMapTask(i, 0, null, new Partition { + override def index: Int = 0 + }, Seq(TaskLocation("host1", "execA")), Seq.empty) + } + val taskSet = new TaskSet(tasks, 0, 0, 0, null) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + manager.speculatableTasks += 0 + val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get + val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get + + assert(manager.runningTasks == 2) + assert(manager.isZombie == false) + + val directTaskResult = new DirectTaskResult[String](null, Seq()) { + override def value(): String = "" + } + manager.handleSuccessfulTask(task1.taskId, directTaskResult) + assert(manager.isZombie == true) + assert(resubmittedTasks == 0) + + manager.executorLost("execB", "host2", new SlaveLost()) + assert(resubmittedTasks == 0) + } + test("speculative and noPref task should be scheduled after node-local") { sc = new SparkContext("local", "test") val sched = new FakeTaskScheduler(