diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index e7645fc19b9f6..79a1afcad716d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -131,37 +131,17 @@ private[spark] class TaskSetManager( // same time for a barrier stage. private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier - // Set of pending tasks for each executor. These collections are actually - // treated as stacks, in which new tasks are added to the end of the - // ArrayBuffer and removed from the end. This makes it faster to detect - // tasks that repeatedly fail because whenever a task failed, it is put - // back at the head of the stack. These collections may contain duplicates - // for two reasons: - // (1): Tasks are only removed lazily; when a task is launched, it remains - // in all the pending lists except the one that it was launched from. - // (2): Tasks may be re-added to these lists multiple times as a result - // of failures. - // Duplicates are handled in dequeueTaskFromList, which ensures that a - // task hasn't already started running before launching it. - private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] - - // Set of pending tasks for each host. Similar to pendingTasksForExecutor, - // but at host level. - private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] - - // Set of pending tasks for each rack -- similar to the above. - private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] - - // Set containing pending tasks with no locality preferences. - private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int] - - // Set containing all pending tasks (also used as a stack, as above). - private val allPendingTasks = new ArrayBuffer[Int] + // Store tasks waiting to be scheduled by locality preferences + private[scheduler] val pendingTasks = new PendingTasksByLocality() // Tasks that can be speculated. Since these will be a small fraction of total - // tasks, we'll just hold them in a HashSet. + // tasks, we'll just hold them in a HashSet. The HashSet here ensures that we do not add + // duplicate speculatable tasks. private[scheduler] val speculatableTasks = new HashSet[Int] + // Store speculatable tasks by locality preferences + private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality() + // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] @@ -197,11 +177,11 @@ private[spark] class TaskSetManager( } // Resolve the rack for each host. This can be slow, so de-dupe the list of hosts, // and assign the rack to all relevant task indices. - val (hosts, indicesForHosts) = pendingTasksForHost.toSeq.unzip + val (hosts, indicesForHosts) = pendingTasks.forHost.toSeq.unzip val racks = sched.getRacksForHosts(hosts) racks.zip(indicesForHosts).foreach { case (Some(rack), indices) => - pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices + pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices case (None, _) => // no rack, nothing to do } } @@ -234,63 +214,41 @@ private[spark] class TaskSetManager( /** Add a task to all the pending-task lists that it should be on. */ private[spark] def addPendingTask( index: Int, - resolveRacks: Boolean = true): Unit = { + resolveRacks: Boolean = true, + speculatable: Boolean = false): Unit = { + val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => - pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index case e: HDFSCacheTaskLocation => val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => for (e <- set) { - pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + - ", but there are no executors alive there.") + ", but there are no executors alive there.") } case _ => } - pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index if (resolveRacks) { sched.getRackForHost(loc.host).foreach { rack => - pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index + pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index } } } if (tasks(index).preferredLocations == Nil) { - pendingTasksWithNoPrefs += index + pendingTaskSetToAddTo.noPrefs += index } - allPendingTasks += index // No point scanning this whole list to find the old task there - } - - /** - * Return the pending tasks list for a given executor ID, or an empty list if - * there is no map entry for that host - */ - private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = { - pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer()) - } - - /** - * Return the pending tasks list for a given host, or an empty list if - * there is no map entry for that host - */ - private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { - pendingTasksForHost.getOrElse(host, ArrayBuffer()) - } - - /** - * Return the pending rack-local task list for a given rack, or an empty list if - * there is no map entry for that rack - */ - private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = { - pendingTasksForRack.getOrElse(rack, ArrayBuffer()) + pendingTaskSetToAddTo.all += index } /** @@ -302,16 +260,24 @@ private[spark] class TaskSetManager( private def dequeueTaskFromList( execId: String, host: String, - list: ArrayBuffer[Int]): Option[Int] = { + list: ArrayBuffer[Int], + speculative: Boolean = false): Option[Int] = { var indexOffset = list.size while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) - if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { + if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && + !(speculative && hasAttemptOnHost(index, host))) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) - if (copiesRunning(index) == 0 && !successful(index)) { - return Some(index) + // Speculatable task should only be launched when at most one copy of the + // original task is running + if (!successful(index)) { + if (copiesRunning(index) == 0) { + return Some(index) + } else if (speculative && copiesRunning(index) == 1) { + return Some(index) + } } } } @@ -331,127 +297,70 @@ private[spark] class TaskSetManager( } /** - * Return a speculative task for a given executor if any are available. The task should not have - * an attempt running on this host, in case the host is slow. In addition, the task should meet - * the given locality constraint. + * Dequeue a pending task for a given node and return its index and locality level. + * Only search for tasks matching the given locality constraint. + * + * @return An option containing (task index within the task set, locality, is speculative?) */ - // Labeled as protected to allow tests to override providing speculative tasks if necessary - protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value)] = - { - speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set + private def dequeueTask( + execId: String, + host: String, + maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = { + // Tries to schedule a regular task first; if it returns None, then schedules + // a speculative task + dequeueTaskHelper(execId, host, maxLocality, false).orElse( + dequeueTaskHelper(execId, host, maxLocality, true)) + } - def canRunOnHost(index: Int): Boolean = { - !hasAttemptOnHost(index, host) && - !isTaskBlacklistedOnExecOrNode(index, execId, host) + protected def dequeueTaskHelper( + execId: String, + host: String, + maxLocality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + if (speculative && speculatableTasks.isEmpty) { + return None } - - if (!speculatableTasks.isEmpty) { - // Check for process-local tasks; note that tasks can be process-local - // on multiple nodes when we replicate cached blocks, as in Spark Streaming - for (index <- speculatableTasks if canRunOnHost(index)) { - val prefs = tasks(index).preferredLocations - val executors = prefs.flatMap(_ match { - case e: ExecutorCacheTaskLocation => Some(e.executorId) - case _ => None - }) - if (executors.contains(execId)) { - speculatableTasks -= index - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } - - // Check for node-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val locations = tasks(index).preferredLocations.map(_.host) - if (locations.contains(host)) { - speculatableTasks -= index - return Some((index, TaskLocality.NODE_LOCAL)) - } - } - } - - // Check for no-preference tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val locations = tasks(index).preferredLocations - if (locations.size == 0) { - speculatableTasks -= index - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } - } - - // Check for rack-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { - for (rack <- sched.getRackForHost(host)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost) - if (racks.contains(rack)) { - speculatableTasks -= index - return Some((index, TaskLocality.RACK_LOCAL)) - } - } - } - } - - // Check for non-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - speculatableTasks -= index - return Some((index, TaskLocality.ANY)) - } + val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks + def dequeue(list: ArrayBuffer[Int]): Option[Int] = { + val task = dequeueTaskFromList(execId, host, list, speculative) + if (speculative && task.isDefined) { + speculatableTasks -= task.get } + task } - None - } - - /** - * Dequeue a pending task for a given node and return its index and locality level. - * Only search for tasks matching the given locality constraint. - * - * @return An option containing (task index within the task set, locality, is speculative?) - */ - private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value, Boolean)] = - { - for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) + dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index => + return Some((index, TaskLocality.PROCESS_LOCAL, speculative)) } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { - for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) { - return Some((index, TaskLocality.NODE_LOCAL, false)) + dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index => + return Some((index, TaskLocality.NODE_LOCAL, speculative)) } } + // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { - // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic - for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) + dequeue(pendingTaskSetToUse.noPrefs).foreach { index => + return Some((index, TaskLocality.PROCESS_LOCAL, speculative)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) - index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack)) + index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer())) } { - return Some((index, TaskLocality.RACK_LOCAL, false)) + return Some((index, TaskLocality.RACK_LOCAL, speculative)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { - for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) { - return Some((index, TaskLocality.ANY, false)) + dequeue(pendingTaskSetToUse.all).foreach { index => + return Some((index, TaskLocality.ANY, speculative)) } } - - // find a speculative task if all others tasks have been scheduled - dequeueSpeculativeTask(execId, host, maxLocality).map { - case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} + None } /** @@ -616,10 +525,10 @@ private[spark] class TaskSetManager( while (currentLocalityIndex < myLocalityLevels.length - 1) { val moreTasks = myLocalityLevels(currentLocalityIndex) match { - case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) - case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) - case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty - case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) + case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor) + case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost) + case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty + case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack) } if (!moreTasks) { // This is a performance optimization: if there are no more tasks that can @@ -686,13 +595,13 @@ private[spark] class TaskSetManager( // from each list, we may need to go deeper in the list. We poll from the end because // failed tasks are put back at the end of allPendingTasks, so we're more likely to find // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => + val indexOffset = pendingTasks.all.lastIndexWhere { indexInTaskSet => copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) } if (indexOffset == -1) { None } else { - Some(allPendingTasks(indexOffset)) + Some(pendingTasks.all(indexOffset)) } } @@ -1064,10 +973,12 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && - !speculatableTasks.contains(index)) { + !speculatableTasks.contains(index)) { + addPendingTask(index, speculatable = true) logInfo( - "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" - .format(index, taskSet.id, info.host, threshold)) + ("Marking task %d in stage %s (on %s) as speculatable because it ran more" + + " than %.0f ms(%d speculatable tasks in this taskset now)") + .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) speculatableTasks += index sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) foundTasks = true @@ -1100,19 +1011,19 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && - pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { + if (!pendingTasks.forExecutor.isEmpty && + pendingTasks.forExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && - pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { + if (!pendingTasks.forHost.isEmpty && + pendingTasks.forHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } - if (!pendingTasksWithNoPrefs.isEmpty) { + if (!pendingTasks.noPrefs.isEmpty) { levels += NO_PREF } - if (!pendingTasksForRack.isEmpty && - pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { + if (!pendingTasks.forRack.isEmpty && + pendingTasks.forRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } levels += ANY @@ -1137,3 +1048,32 @@ private[spark] object TaskSetManager { // this. val TASK_SIZE_TO_WARN_KIB = 1000 } + +/** + * Set of pending tasks for various levels of locality: executor, host, rack, + * noPrefs and anyPrefs. These collections are actually + * treated as stacks, in which new tasks are added to the end of the + * ArrayBuffer and removed from the end. This makes it faster to detect + * tasks that repeatedly fail because whenever a task failed, it is put + * back at the head of the stack. These collections may contain duplicates + * for two reasons: + * (1): Tasks are only removed lazily; when a task is launched, it remains + * in all the pending lists except the one that it was launched from. + * (2): Tasks may be re-added to these lists multiple times as a result + * of failures. + * Duplicates are handled in dequeueTaskFromList, which ensures that a + * task hasn't already started running before launching it. + */ +private[scheduler] class PendingTasksByLocality { + + // Set of pending tasks for each executor. + val forExecutor = new HashMap[String, ArrayBuffer[Int]] + // Set of pending tasks for each host. Similar to pendingTasksForExecutor, but at host level. + val forHost = new HashMap[String, ArrayBuffer[Int]] + // Set containing pending tasks with no locality preferences. + val noPrefs = new ArrayBuffer[Int] + // Set of pending tasks for each rack -- similar to the above. + val forRack = new HashMap[String, ArrayBuffer[Int]] + // Set containing all pending tasks (also used as a stack, as above). + val all = new ArrayBuffer[Int] +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index f582ef54dd1f8..d6964063c118e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -107,14 +107,18 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet] new TaskSetManager(mockTaskScheduler, taskSet, 4) { private var hasDequeuedSpeculatedTask = false - override def dequeueSpeculativeTask(execId: String, + override def dequeueTaskHelper( + execId: String, host: String, - locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = { - if (hasDequeuedSpeculatedTask) { + locality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + if (!speculative) { + super.dequeueTaskHelper(execId, host, locality, speculative) + } else if (hasDequeuedSpeculatedTask) { None } else { hasDequeuedSpeculatedTask = true - Some((0, TaskLocality.PROCESS_LOCAL)) + Some((0, TaskLocality.PROCESS_LOCAL, true)) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index da566dd82bcec..4bc8ee450e929 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -740,6 +740,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. manager.speculatableTasks += singleTask.partitionId + manager.addPendingTask(singleTask.partitionId, speculatable = true) val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get assert(manager.runningTasks === 2) @@ -885,6 +886,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) manager.speculatableTasks += 1 + manager.addPendingTask(1, speculatable = true) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) @@ -975,7 +977,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched.addExecutor("execA", "host1") sched.addExecutor("execB.2", "host2") manager.executorAdded() - assert(manager.pendingTasksWithNoPrefs.size === 0) + assert(manager.pendingTasks.noPrefs.size === 0) // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) assert(manager.resourceOffer("execA", "host1", ANY) !== None) @@ -1166,7 +1168,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. - assert(sched.endedTasks(3) === Success) + assert(sched.endedTasks(4) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test")) @@ -1327,7 +1329,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY) // Assert the task has been black listed on the executor it was last executed on. - when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean())).thenAnswer( + when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer( (invocationOnMock: InvocationOnMock) => { val task: Int = invocationOnMock.getArgument(0) assert(taskSetManager.taskSetBlacklistHelperOpt.get. @@ -1339,7 +1341,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val e = new ExceptionFailure("a", "b", Array(), "c", None) taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e) - verify(taskSetManagerSpy, times(1)).addPendingTask(0, false) + verify(taskSetManagerSpy, times(1)).addPendingTask(0, false, false) } test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") { @@ -1655,4 +1657,116 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // get removed inside TaskSchedulerImpl later. assert(availableResources(GPU) sameElements Array("0", "1", "2", "3")) } + + test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start, 2 on each exec + Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => + (0 until 2).foreach { _ => + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) + // Complete the first 2 tasks and leave the other 2 tasks in running + for (id <- Set(0, 2)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(1, 3)) + assert(manager.copiesRunning(1) === 1) + assert(manager.copiesRunning(3) === 1) + + // Offer resource to start the speculative attempt for the running task. We offer more + // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra + // copy per speculatable task + val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF) + assert(taskOption2.isDefined) + val task2 = taskOption2.get + // Ensure that task index 3 is launched on host1 and task index 4 on host2 + assert(task2.index === 3) + assert(task2.taskId === 4) + assert(task2.executorId === "exec1") + assert(task2.attemptNumber === 1) + assert(taskOption3.isDefined) + val task3 = taskOption3.get + assert(task3.index === 1) + assert(task3.taskId === 5) + assert(task3.executorId === "exec2") + assert(task3.attemptNumber === 1) + clock.advance(1) + // Running checkSpeculatableTasks again should return false + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.copiesRunning(1) === 2) + assert(manager.copiesRunning(3) === 2) + // Offering additional resources should not lead to any speculative tasks being respawned + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + } + + test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") { + sc = new SparkContext("local", "test") + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + // Launch a new set of tasks with locality preferences + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4")) + val taskSet = FakeTask.createTaskSet(3, + Seq(TaskLocation("host1"), TaskLocation("host3")), + Seq(TaskLocation("host2")), + Seq(TaskLocation("host3"))) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 3 tasks to start + Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2)) + clock.advance(1) + // Finish one task and mark the others as speculatable + manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask2(2))) + assert(sched.endedTasks(2) === Success) + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(0, 1)) + // Ensure that the speculatable tasks obey the original locality preferences + assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + // task 1 does have a node-local preference for host2 -- but we've already got a regular + // task running there, so we should not schedule a speculative there as well. + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) + assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) + // Since, all speculatable tasks have been launched, making another offer + // should not schedule any more tasks + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + } }