From 2d87a62438e8f3a3267bcd7ab15a213a98ca61eb Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 28 Jan 2019 17:01:49 -0600 Subject: [PATCH 01/13] [SPARK-26755] : Optimize Spark Scheduler to dequeue speculative tasks more efficiently Have split the main queue "speculatableTasks" into 5 separate queues based on locality preference similar to how normal tasks are enqueued. --- .../spark/scheduler/TaskSetManager.scala | 152 +++++++++++------- .../spark/scheduler/TaskSetManagerSuite.scala | 6 +- 2 files changed, 95 insertions(+), 63 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b7bf06974fd5..a8a39879378f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -155,9 +155,21 @@ private[spark] class TaskSetManager( // Set containing all pending tasks (also used as a stack, as above). private val allPendingTasks = new ArrayBuffer[Int] - // Tasks that can be speculated. Since these will be a small fraction of total - // tasks, we'll just hold them in a HashSet. - private[scheduler] val speculatableTasks = new HashSet[Int] + // Set of pending tasks that can be speculated for each executor. + private[scheduler] var pendingSpeculatableTasksForExecutor = + new HashMap[String, ArrayBuffer[Int]] + + // Set of pending tasks that can be speculated for each host. + private[scheduler] var pendingSpeculatableTasksForHost = new HashMap[String, ArrayBuffer[Int]] + + // Set of pending tasks that can be speculated with no locality preferences. + private[scheduler] val pendingSpeculatableTasksWithNoPrefs = new ArrayBuffer[Int] + + // Set of pending tasks that can be speculated for each rack. + private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, ArrayBuffer[Int]] + + // Set of all pending tasks that can be speculated. + private[scheduler] val allPendingSpeculatableTasks = new ArrayBuffer[Int] // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] @@ -245,6 +257,28 @@ private[spark] class TaskSetManager( allPendingTasks += index // No point scanning this whole list to find the old task there } + private[spark] def addPendingSpeculativeTask(index: Int) { + for (loc <- tasks(index).preferredLocations) { + loc match { + case e: ExecutorCacheTaskLocation => + pendingSpeculatableTasksForExecutor.getOrElseUpdate( + e.executorId, new ArrayBuffer) += index + case _ => + } + pendingSpeculatableTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + for (rack <- sched.getRackForHost(loc.host)) { + pendingSpeculatableTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index + } + } + + if (tasks(index).preferredLocations == Nil) { + pendingSpeculatableTasksWithNoPrefs += index + } + + // No point scanning this whole list to find the old task there + allPendingSpeculatableTasks += index + } + /** * Return the pending tasks list for a given executor ID, or an empty list if * there is no map entry for that host @@ -294,6 +328,30 @@ private[spark] class TaskSetManager( None } + /** + * Dequeue a pending speculative task from the given list and return its index. Runs similar + * to the method 'dequeueTaskFromList' with additional constraints. Return None if the + * list is empty. + */ + private def dequeueSpeculativeTaskFromList( + execId: String, + host: String, + list: ArrayBuffer[Int]): Option[Int] = { + var indexOffset = list.size + while (indexOffset > 0) { + indexOffset -= 1 + val index = list(indexOffset) + if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && !hasAttemptOnHost(index, host)) { + // This should almost always be list.trimEnd(1) to remove tail + list.remove(indexOffset) + if (!successful(index)) { + return Some(index) + } + } + } + None + } + /** Check whether a task once ran an attempt on a given host */ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = { taskAttempts(taskIndex).exists(_.host == host) @@ -315,69 +373,44 @@ private[spark] class TaskSetManager( protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { - speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set - - def canRunOnHost(index: Int): Boolean = { - !hasAttemptOnHost(index, host) && - !isTaskBlacklistedOnExecOrNode(index, execId, host) - } - - if (!speculatableTasks.isEmpty) { // Check for process-local tasks; note that tasks can be process-local // on multiple nodes when we replicate cached blocks, as in Spark Streaming - for (index <- speculatableTasks if canRunOnHost(index)) { - val prefs = tasks(index).preferredLocations - val executors = prefs.flatMap(_ match { - case e: ExecutorCacheTaskLocation => Some(e.executorId) - case _ => None - }); - if (executors.contains(execId)) { - speculatableTasks -= index - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } + for (index <- dequeueSpeculativeTaskFromList( + execId, host, pendingSpeculatableTasksForExecutor.getOrElse(execId, ArrayBuffer()))) { + return Some((index, TaskLocality.PROCESS_LOCAL)) + } - // Check for node-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val locations = tasks(index).preferredLocations.map(_.host) - if (locations.contains(host)) { - speculatableTasks -= index - return Some((index, TaskLocality.NODE_LOCAL)) - } - } + // Check for node-local tasks + if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { + for (index <- dequeueSpeculativeTaskFromList( + execId, host, pendingSpeculatableTasksForHost.getOrElse(host, ArrayBuffer()))) { + return Some((index, TaskLocality.NODE_LOCAL)) } + } - // Check for no-preference tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val locations = tasks(index).preferredLocations - if (locations.size == 0) { - speculatableTasks -= index - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } + // Check for no-preference tasks + if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { + for (index <- dequeueSpeculativeTaskFromList( + execId, host, pendingSpeculatableTasksWithNoPrefs)) { + return Some((index, TaskLocality.PROCESS_LOCAL)) } + } - // Check for rack-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { - for (rack <- sched.getRackForHost(host)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost) - if (racks.contains(rack)) { - speculatableTasks -= index - return Some((index, TaskLocality.RACK_LOCAL)) - } - } - } + // Check for rack-local tasks + if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { + for { + rack <- sched.getRackForHost(host) + index <- dequeueSpeculativeTaskFromList( + execId, host, pendingSpeculatableTasksForRack.getOrElse(rack, ArrayBuffer())) + } { + return Some((index, TaskLocality.RACK_LOCAL)) } + } - // Check for non-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- speculatableTasks if canRunOnHost(index)) { - speculatableTasks -= index - return Some((index, TaskLocality.ANY)) - } + // Check for non-local tasks + if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { + for (index <- dequeueSpeculativeTaskFromList(execId, host, allPendingSpeculatableTasks)) { + return Some((index, TaskLocality.ANY)) } } @@ -1029,12 +1062,11 @@ private[spark] class TaskSetManager( for (tid <- runningTasksSet) { val info = taskInfos(tid) val index = info.index - if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && - !speculatableTasks.contains(index)) { + if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold) { + addPendingSpeculativeTask(index) logInfo( "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" .format(index, taskSet.id, info.host, threshold)) - speculatableTasks += index sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) foundTasks = true } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 60acd3ed4cd4..103d3ee18cac 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -724,7 +724,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. - manager.speculatableTasks += singleTask.partitionId + manager.addPendingSpeculativeTask(singleTask.partitionId) val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get assert(manager.runningTasks === 2) @@ -869,7 +869,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) - manager.speculatableTasks += 1 + manager.addPendingSpeculativeTask(1) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) @@ -1151,7 +1151,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. - assert(sched.endedTasks(3) === Success) + assert(sched.endedTasks(4) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test")) From 65d09265bc6ae16cf595798e2a24e3d61c0ff0e9 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 29 Jan 2019 11:12:32 -0600 Subject: [PATCH 02/13] [SPARK-26755] : Using HashSet instead of ArrayBuffer to reject duplicate task index --- .../spark/scheduler/TaskSetManager.scala | 41 +++++++++---------- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a8a39879378f..7faede4d5c7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -157,19 +157,19 @@ private[spark] class TaskSetManager( // Set of pending tasks that can be speculated for each executor. private[scheduler] var pendingSpeculatableTasksForExecutor = - new HashMap[String, ArrayBuffer[Int]] + new HashMap[String, HashSet[Int]] // Set of pending tasks that can be speculated for each host. - private[scheduler] var pendingSpeculatableTasksForHost = new HashMap[String, ArrayBuffer[Int]] + private[scheduler] var pendingSpeculatableTasksForHost = new HashMap[String, HashSet[Int]] // Set of pending tasks that can be speculated with no locality preferences. - private[scheduler] val pendingSpeculatableTasksWithNoPrefs = new ArrayBuffer[Int] + private[scheduler] val pendingSpeculatableTasksWithNoPrefs = new HashSet[Int] // Set of pending tasks that can be speculated for each rack. - private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, ArrayBuffer[Int]] + private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, HashSet[Int]] // Set of all pending tasks that can be speculated. - private[scheduler] val allPendingSpeculatableTasks = new ArrayBuffer[Int] + private[scheduler] val allPendingSpeculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] @@ -262,12 +262,12 @@ private[spark] class TaskSetManager( loc match { case e: ExecutorCacheTaskLocation => pendingSpeculatableTasksForExecutor.getOrElseUpdate( - e.executorId, new ArrayBuffer) += index + e.executorId, new HashSet) += index case _ => } - pendingSpeculatableTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + pendingSpeculatableTasksForHost.getOrElseUpdate(loc.host, new HashSet) += index for (rack <- sched.getRackForHost(loc.host)) { - pendingSpeculatableTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index + pendingSpeculatableTasksForRack.getOrElseUpdate(rack, new HashSet) += index } } @@ -336,16 +336,15 @@ private[spark] class TaskSetManager( private def dequeueSpeculativeTaskFromList( execId: String, host: String, - list: ArrayBuffer[Int]): Option[Int] = { - var indexOffset = list.size - while (indexOffset > 0) { - indexOffset -= 1 - val index = list(indexOffset) - if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && !hasAttemptOnHost(index, host)) { - // This should almost always be list.trimEnd(1) to remove tail - list.remove(indexOffset) - if (!successful(index)) { - return Some(index) + list: HashSet[Int]): Option[Int] = { + if (!list.isEmpty) { + for (index <- list) { + if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && !hasAttemptOnHost(index, host)) { + // This should almost always be list.trimEnd(1) to remove tail + list -= index + if (!successful(index)) { + return Some(index) + } } } } @@ -376,14 +375,14 @@ private[spark] class TaskSetManager( // Check for process-local tasks; note that tasks can be process-local // on multiple nodes when we replicate cached blocks, as in Spark Streaming for (index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksForExecutor.getOrElse(execId, ArrayBuffer()))) { + execId, host, pendingSpeculatableTasksForExecutor.getOrElse(execId, HashSet()))) { return Some((index, TaskLocality.PROCESS_LOCAL)) } // Check for node-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksForHost.getOrElse(host, ArrayBuffer()))) { + execId, host, pendingSpeculatableTasksForHost.getOrElse(host, HashSet()))) { return Some((index, TaskLocality.NODE_LOCAL)) } } @@ -401,7 +400,7 @@ private[spark] class TaskSetManager( for { rack <- sched.getRackForHost(host) index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksForRack.getOrElse(rack, ArrayBuffer())) + execId, host, pendingSpeculatableTasksForRack.getOrElse(rack, HashSet())) } { return Some((index, TaskLocality.RACK_LOCAL)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 103d3ee18cac..ba06c9a72ff1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1151,7 +1151,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. - assert(sched.endedTasks(4) === Success) + assert(sched.endedTasks(3) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test")) From 41ddf23f96b11afd3468fe358b4c16b3d4622d70 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 1 Jul 2019 13:14:00 -0500 Subject: [PATCH 03/13] [SPARK-26755] : Addressing Reviews July 1, 2019 Adding Unit test, refactoring code to remove duplicate methods, refactoring comments etc. --- .../spark/scheduler/TaskSetManager.scala | 265 +++++++++--------- .../spark/scheduler/TaskSetManagerSuite.scala | 89 +++++- 2 files changed, 215 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c0c79e6db893..aeab6a34810d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -131,7 +131,8 @@ private[spark] class TaskSetManager( // same time for a barrier stage. private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier - // Set of pending tasks for each executor. These collections are actually + // Set of pending tasks for various levels of locality: executor, host, rack, + // noPrefs and anyPrefs. These collections are actually // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put @@ -143,36 +144,25 @@ private[spark] class TaskSetManager( // of failures. // Duplicates are handled in dequeueTaskFromList, which ensures that a // task hasn't already started running before launching it. - private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] - // Set of pending tasks for each host. Similar to pendingTasksForExecutor, - // but at host level. - private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] - - // Set of pending tasks for each rack -- similar to the above. - private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] - - // Set containing pending tasks with no locality preferences. - private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int] - - // Set containing all pending tasks (also used as a stack, as above). - private val allPendingTasks = new ArrayBuffer[Int] - - // Set of pending tasks that can be speculated for each executor. - private[scheduler] var pendingSpeculatableTasksForExecutor = - new HashMap[String, HashSet[Int]] - - // Set of pending tasks that can be speculated for each host. - private[scheduler] var pendingSpeculatableTasksForHost = new HashMap[String, HashSet[Int]] - - // Set of pending tasks that can be speculated with no locality preferences. - private[scheduler] val pendingSpeculatableTasksWithNoPrefs = new HashSet[Int] - - // Set of pending tasks that can be speculated for each rack. - private[scheduler] var pendingSpeculatableTasksForRack = new HashMap[String, HashSet[Int]] - - // Set of all pending tasks that can be speculated. - private[scheduler] val allPendingSpeculatableTasks = new HashSet[Int] + private[scheduler] val pendingTasks = PendingTasksByLocality( + forExecutor = new HashMap[String, ArrayBuffer[Int]], + forHost = new HashMap[String, ArrayBuffer[Int]], + noPrefs = new ArrayBuffer[Int], + forRack = new HashMap[String, ArrayBuffer[Int]], + anyPrefs = new ArrayBuffer[Int]) + + // The HashSet here ensures that we do not add duplicate speculative tasks + private[scheduler] val speculatableTasks = new HashSet[Int] + + // Set of pending tasks marked as speculative for various levels of locality: executor, host, + // rack, noPrefs and anyPrefs + private[scheduler] val pendingSpeculatableTasks = PendingTasksByLocality( + forExecutor = new HashMap[String, ArrayBuffer[Int]], + forHost = new HashMap[String, ArrayBuffer[Int]], + noPrefs = new ArrayBuffer[Int], + forRack = new HashMap[String, ArrayBuffer[Int]], + anyPrefs = new ArrayBuffer[Int]) // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] @@ -209,11 +199,11 @@ private[spark] class TaskSetManager( } // Resolve the rack for each host. This can be slow, so de-dupe the list of hosts, // and assign the rack to all relevant task indices. - val (hosts, indicesForHosts) = pendingTasksForHost.toSeq.unzip + val (hosts, indicesForHosts) = pendingTasks.forHost.toSeq.unzip val racks = sched.getRacksForHosts(hosts) racks.zip(indicesForHosts).foreach { case (Some(rack), indices) => - pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices + pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) ++= indices case (None, _) => // no rack, nothing to do } } @@ -246,61 +236,62 @@ private[spark] class TaskSetManager( /** Add a task to all the pending-task lists that it should be on. */ private[spark] def addPendingTask( index: Int, - resolveRacks: Boolean = true): Unit = { - for (loc <- tasks(index).preferredLocations) { - loc match { - case e: ExecutorCacheTaskLocation => - pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index - case e: HDFSCacheTaskLocation => - val exe = sched.getExecutorsAliveOnHost(loc.host) - exe match { - case Some(set) => - for (e <- set) { - pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index - } - logInfo(s"Pending task $index has a cached location at ${e.host} " + - ", where there are executors " + set.mkString(",")) - case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + - ", but there are no executors alive there.") - } - case _ => + resolveRacks: Boolean = true, + speculative: Boolean = false): Unit = { + if (speculative) { + for (loc <- tasks(index).preferredLocations) { + loc match { + case e: ExecutorCacheTaskLocation => + pendingSpeculatableTasks.forExecutor.getOrElseUpdate( + e.executorId, new ArrayBuffer) += index + case _ => + } + pendingSpeculatableTasks.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index + for (rack <- sched.getRackForHost(loc.host)) { + pendingSpeculatableTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + } } - pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - if (resolveRacks) { - sched.getRackForHost(loc.host).foreach { rack => - pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index - } + if (tasks(index).preferredLocations == Nil) { + pendingSpeculatableTasks.noPrefs += index } - } - if (tasks(index).preferredLocations == Nil) { - pendingTasksWithNoPrefs += index - } + pendingSpeculatableTasks.anyPrefs += index + } else { - allPendingTasks += index // No point scanning this whole list to find the old task there - } + for (loc <- tasks(index).preferredLocations) { + loc match { + case e: ExecutorCacheTaskLocation => + pendingTasks.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index + case e: HDFSCacheTaskLocation => + val exe = sched.getExecutorsAliveOnHost(loc.host) + exe match { + case Some(set) => + for (e <- set) { + pendingTasks.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index + } + logInfo(s"Pending task $index has a cached location at ${e.host} " + + ", where there are executors " + set.mkString(",")) + case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + + ", but there are no executors alive there.") + } + case _ => + } + pendingTasks.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - private[spark] def addPendingSpeculativeTask(index: Int) { - for (loc <- tasks(index).preferredLocations) { - loc match { - case e: ExecutorCacheTaskLocation => - pendingSpeculatableTasksForExecutor.getOrElseUpdate( - e.executorId, new HashSet) += index - case _ => + if (resolveRacks) { + sched.getRackForHost(loc.host).foreach { rack => + pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + } + } } - pendingSpeculatableTasksForHost.getOrElseUpdate(loc.host, new HashSet) += index - for (rack <- sched.getRackForHost(loc.host)) { - pendingSpeculatableTasksForRack.getOrElseUpdate(rack, new HashSet) += index + + if (tasks(index).preferredLocations == Nil) { + pendingTasks.noPrefs += index } - } - if (tasks(index).preferredLocations == Nil) { - pendingSpeculatableTasksWithNoPrefs += index + pendingTasks.anyPrefs += index } - - // No point scanning this whole list to find the old task there - allPendingSpeculatableTasks += index } /** @@ -308,7 +299,7 @@ private[spark] class TaskSetManager( * there is no map entry for that host */ private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = { - pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer()) + pendingTasks.forExecutor.getOrElse(executorId, ArrayBuffer()) } /** @@ -316,7 +307,7 @@ private[spark] class TaskSetManager( * there is no map entry for that host */ private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { - pendingTasksForHost.getOrElse(host, ArrayBuffer()) + pendingTasks.forHost.getOrElse(host, ArrayBuffer()) } /** @@ -324,7 +315,7 @@ private[spark] class TaskSetManager( * there is no map entry for that rack */ private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = { - pendingTasksForRack.getOrElse(rack, ArrayBuffer()) + pendingTasks.forRack.getOrElse(rack, ArrayBuffer()) } /** @@ -336,37 +327,30 @@ private[spark] class TaskSetManager( private def dequeueTaskFromList( execId: String, host: String, - list: ArrayBuffer[Int]): Option[Int] = { - var indexOffset = list.size - while (indexOffset > 0) { - indexOffset -= 1 - val index = list(indexOffset) - if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { - // This should almost always be list.trimEnd(1) to remove tail - list.remove(indexOffset) - if (copiesRunning(index) == 0 && !successful(index)) { - return Some(index) + list: ArrayBuffer[Int], + speculative: Boolean = false): Option[Int] = { + if (speculative) { + if (!list.isEmpty) { + for (index <- list) { + if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && + !hasAttemptOnHost(index, host)) { + // This should almost always be list.trimEnd(1) to remove tail + list -= index + if (!successful(index)) { + return Some(index) + } + } } } - } - None - } - - /** - * Dequeue a pending speculative task from the given list and return its index. Runs similar - * to the method 'dequeueTaskFromList' with additional constraints. Return None if the - * list is empty. - */ - private def dequeueSpeculativeTaskFromList( - execId: String, - host: String, - list: HashSet[Int]): Option[Int] = { - if (!list.isEmpty) { - for (index <- list) { - if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && !hasAttemptOnHost(index, host)) { + } else { + var indexOffset = list.size + while (indexOffset > 0) { + indexOffset -= 1 + val index = list(indexOffset) + if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { // This should almost always be list.trimEnd(1) to remove tail - list -= index - if (!successful(index)) { + list.remove(indexOffset) + if (copiesRunning(index) == 0 && !successful(index)) { return Some(index) } } @@ -398,23 +382,25 @@ private[spark] class TaskSetManager( { // Check for process-local tasks; note that tasks can be process-local // on multiple nodes when we replicate cached blocks, as in Spark Streaming - for (index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksForExecutor.getOrElse(execId, HashSet()))) { + for (index <- dequeueTaskFromList( + execId, host, pendingSpeculatableTasks.forExecutor.getOrElse(execId, ArrayBuffer()), + speculative = true)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } // Check for node-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksForHost.getOrElse(host, HashSet()))) { + for (index <- dequeueTaskFromList( + execId, host, pendingSpeculatableTasks.forHost.getOrElse(host, ArrayBuffer()), + speculative = true)) { return Some((index, TaskLocality.NODE_LOCAL)) } } // Check for no-preference tasks if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { - for (index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksWithNoPrefs)) { + for (index <- dequeueTaskFromList( + execId, host, pendingSpeculatableTasks.noPrefs, speculative = true)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } } @@ -423,8 +409,9 @@ private[spark] class TaskSetManager( if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) - index <- dequeueSpeculativeTaskFromList( - execId, host, pendingSpeculatableTasksForRack.getOrElse(rack, HashSet())) + index <- dequeueTaskFromList( + execId, host, pendingSpeculatableTasks.forRack.getOrElse(rack, ArrayBuffer()), + speculative = true) } { return Some((index, TaskLocality.RACK_LOCAL)) } @@ -432,7 +419,8 @@ private[spark] class TaskSetManager( // Check for non-local tasks if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- dequeueSpeculativeTaskFromList(execId, host, allPendingSpeculatableTasks)) { + for (index <- dequeueTaskFromList(execId, host, pendingSpeculatableTasks.anyPrefs, + speculative = true)) { return Some((index, TaskLocality.ANY)) } } @@ -461,7 +449,7 @@ private[spark] class TaskSetManager( if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic - for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) { + for (index <- dequeueTaskFromList(execId, host, pendingTasks.noPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } } @@ -476,7 +464,7 @@ private[spark] class TaskSetManager( } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { - for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) { + for (index <- dequeueTaskFromList(execId, host, pendingTasks.anyPrefs)) { return Some((index, TaskLocality.ANY, false)) } } @@ -648,10 +636,10 @@ private[spark] class TaskSetManager( while (currentLocalityIndex < myLocalityLevels.length - 1) { val moreTasks = myLocalityLevels(currentLocalityIndex) match { - case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor) - case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost) - case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty - case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack) + case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor) + case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost) + case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty + case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack) } if (!moreTasks) { // This is a performance optimization: if there are no more tasks that can @@ -718,13 +706,13 @@ private[spark] class TaskSetManager( // from each list, we may need to go deeper in the list. We poll from the end because // failed tasks are put back at the end of allPendingTasks, so we're more likely to find // an unschedulable task this way. - val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet => + val indexOffset = pendingTasks.anyPrefs.lastIndexWhere { indexInTaskSet => copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) } if (indexOffset == -1) { None } else { - Some(allPendingTasks(indexOffset)) + Some(pendingTasks.anyPrefs(indexOffset)) } } @@ -1095,11 +1083,13 @@ private[spark] class TaskSetManager( for (tid <- runningTasksSet) { val info = taskInfos(tid) val index = info.index - if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold) { - addPendingSpeculativeTask(index) + if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && + !speculatableTasks.contains(index)) { + addPendingTask(index, speculative = true) logInfo( "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" .format(index, taskSet.id, info.host, threshold)) + speculatableTasks += index sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) foundTasks = true } @@ -1131,19 +1121,19 @@ private[spark] class TaskSetManager( private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} val levels = new ArrayBuffer[TaskLocality.TaskLocality] - if (!pendingTasksForExecutor.isEmpty && - pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { + if (!pendingTasks.forExecutor.isEmpty && + pendingTasks.forExecutor.keySet.exists(sched.isExecutorAlive(_))) { levels += PROCESS_LOCAL } - if (!pendingTasksForHost.isEmpty && - pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { + if (!pendingTasks.forHost.isEmpty && + pendingTasks.forHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { levels += NODE_LOCAL } - if (!pendingTasksWithNoPrefs.isEmpty) { + if (!pendingTasks.noPrefs.isEmpty) { levels += NO_PREF } - if (!pendingTasksForRack.isEmpty && - pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { + if (!pendingTasks.forRack.isEmpty && + pendingTasks.forRack.keySet.exists(sched.hasHostAliveOnRack(_))) { levels += RACK_LOCAL } levels += ANY @@ -1168,3 +1158,10 @@ private[spark] object TaskSetManager { // this. val TASK_SIZE_TO_WARN_KIB = 1000 } + +case class PendingTasksByLocality( + forExecutor: HashMap[String, ArrayBuffer[Int]], + forHost: HashMap[String, ArrayBuffer[Int]], + noPrefs: ArrayBuffer[Int], + forRack: HashMap[String, ArrayBuffer[Int]], + anyPrefs: ArrayBuffer[Int]) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4b32c40e0012..43aaeccfc167 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -739,7 +739,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. - manager.addPendingSpeculativeTask(singleTask.partitionId) + manager.speculatableTasks += singleTask.partitionId + manager.addPendingTask(singleTask.partitionId, speculative = true) val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get assert(manager.runningTasks === 2) @@ -884,7 +885,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) - manager.addPendingSpeculativeTask(1) + manager.speculatableTasks += 1 + manager.addPendingTask(1, speculative = true) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) @@ -975,7 +977,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched.addExecutor("execA", "host1") sched.addExecutor("execB.2", "host2") manager.executorAdded() - assert(manager.pendingTasksWithNoPrefs.size === 0) + assert(manager.pendingTasks.noPrefs.size === 0) // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) assert(manager.resourceOffer("execA", "host1", ANY) !== None) @@ -1327,7 +1329,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY) // Assert the task has been black listed on the executor it was last executed on. - when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean())).thenAnswer( + when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer( (invocationOnMock: InvocationOnMock) => { val task: Int = invocationOnMock.getArgument(0) assert(taskSetManager.taskSetBlacklistHelperOpt.get. @@ -1339,7 +1341,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val e = new ExceptionFailure("a", "b", Array(), "c", None) taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, e) - verify(taskSetManagerSpy, times(1)).addPendingTask(0, false) + verify(taskSetManagerSpy, times(1)).addPendingTask(0, false, false) } test("SPARK-21563 context's added jars shouldn't change mid-TaskSet") { @@ -1655,4 +1657,81 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // get removed inside TaskSchedulerImpl later. assert(availableResources(GPU) sameElements Array("0", "1", "2", "3")) } + + test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val taskSet = FakeTask.createTaskSet(4) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 4 tasks to start + for ((k, v) <- List( + "exec1" -> "host1", + "exec1" -> "host1", + "exec2" -> "host2", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(k, v, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === k) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) + clock.advance(1) + // Complete the first 2 tasks and leave the other 2 tasks in running + for (id <- Set(0, 1)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) + } + // checkSpeculatableTasks checks that the task runtime is greater than the threshold for + // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for + // > 0ms, so advance the clock by 1ms here. + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(2, 3)) + assert(manager.pendingSpeculatableTasks.forExecutor.size === 0) + assert(manager.pendingSpeculatableTasks.forHost.size === 0) + assert(manager.pendingSpeculatableTasks.forRack.size === 0) + assert(manager.pendingSpeculatableTasks.anyPrefs.size === 2) + assert(manager.pendingSpeculatableTasks.noPrefs.size === 2) + + // Offer resource to start the speculative attempt for the running task + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption6 = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption5.isDefined) + val task5 = taskOption5.get + assert(task5.index === 2) + assert(task5.taskId === 4) + assert(task5.executorId === "exec1") + assert(task5.attemptNumber === 1) + assert(taskOption6.isDefined) + val task6 = taskOption6.get + assert(task6.index === 3) + assert(task6.taskId === 5) + assert(task6.executorId === "exec1") + assert(task6.attemptNumber === 1) + sched.initialize(new FakeSchedulerBackend() { + override def killTask( + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = {} + }) + clock.advance(1) + // Running checkSpeculatableTasks again should return false + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.pendingSpeculatableTasks.forExecutor.size === 0) + assert(manager.pendingSpeculatableTasks.forHost.size === 0) + assert(manager.pendingSpeculatableTasks.forRack.size === 0) + // allPendingSpeculativeTasks will still have two pending tasks but + // pendingSpeculatableTasksWithNoPrefs should have none + assert(manager.pendingSpeculatableTasks.anyPrefs.size === 2) + assert(manager.pendingSpeculatableTasks.noPrefs.size === 0) + } } From 1fa17ec35b8320c0c627b27abd650d75946d5c72 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 8 Jul 2019 11:11:11 -0500 Subject: [PATCH 04/13] [SPARK-26755] : Addressing Reviews July 8, 2019 Restructuring code and eliminating duplicate code --- .../spark/scheduler/TaskSetManager.scala | 267 ++++++------------ .../OutputCommitCoordinatorSuite.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 6 +- 3 files changed, 83 insertions(+), 192 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index aeab6a34810d..b2254d5dd736 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -145,24 +145,14 @@ private[spark] class TaskSetManager( // Duplicates are handled in dequeueTaskFromList, which ensures that a // task hasn't already started running before launching it. - private[scheduler] val pendingTasks = PendingTasksByLocality( - forExecutor = new HashMap[String, ArrayBuffer[Int]], - forHost = new HashMap[String, ArrayBuffer[Int]], - noPrefs = new ArrayBuffer[Int], - forRack = new HashMap[String, ArrayBuffer[Int]], - anyPrefs = new ArrayBuffer[Int]) + private[scheduler] val pendingTasks = new PendingTasksByLocality() // The HashSet here ensures that we do not add duplicate speculative tasks private[scheduler] val speculatableTasks = new HashSet[Int] // Set of pending tasks marked as speculative for various levels of locality: executor, host, // rack, noPrefs and anyPrefs - private[scheduler] val pendingSpeculatableTasks = PendingTasksByLocality( - forExecutor = new HashMap[String, ArrayBuffer[Int]], - forHost = new HashMap[String, ArrayBuffer[Int]], - noPrefs = new ArrayBuffer[Int], - forRack = new HashMap[String, ArrayBuffer[Int]], - anyPrefs = new ArrayBuffer[Int]) + private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality() // Task index, start and finish time for each task attempt (indexed by task ID) private[scheduler] val taskInfos = new HashMap[Long, TaskInfo] @@ -238,84 +228,41 @@ private[spark] class TaskSetManager( index: Int, resolveRacks: Boolean = true, speculative: Boolean = false): Unit = { - if (speculative) { - for (loc <- tasks(index).preferredLocations) { - loc match { - case e: ExecutorCacheTaskLocation => - pendingSpeculatableTasks.forExecutor.getOrElseUpdate( - e.executorId, new ArrayBuffer) += index - case _ => - } - pendingSpeculatableTasks.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - for (rack <- sched.getRackForHost(loc.host)) { - pendingSpeculatableTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index - } - } - - if (tasks(index).preferredLocations == Nil) { - pendingSpeculatableTasks.noPrefs += index - } - - pendingSpeculatableTasks.anyPrefs += index - } else { - - for (loc <- tasks(index).preferredLocations) { - loc match { - case e: ExecutorCacheTaskLocation => - pendingTasks.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index - case e: HDFSCacheTaskLocation => - val exe = sched.getExecutorsAliveOnHost(loc.host) - exe match { - case Some(set) => - for (e <- set) { - pendingTasks.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index - } - logInfo(s"Pending task $index has a cached location at ${e.host} " + - ", where there are executors " + set.mkString(",")) - case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + - ", but there are no executors alive there.") - } - case _ => - } - pendingTasks.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - - if (resolveRacks) { - sched.getRackForHost(loc.host).foreach { rack => - pendingTasks.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks + // ... mostly the original code from `addPendingTask` here, just adding + // into pendingTaskSetToAddTo + for (loc <- tasks(index).preferredLocations) { + loc match { + case e: ExecutorCacheTaskLocation => + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index + case e: HDFSCacheTaskLocation => + val exe = sched.getExecutorsAliveOnHost(loc.host) + exe match { + case Some(set) => + for (e <- set) { + pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index + } + logInfo(s"Pending task $index has a cached location at ${e.host} " + + ", where there are executors " + set.mkString(",")) + case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + + ", but there are no executors alive there.") } - } + case _ => } + pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index - if (tasks(index).preferredLocations == Nil) { - pendingTasks.noPrefs += index + if (resolveRacks) { + sched.getRackForHost(loc.host).foreach { rack => + pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index + } } - - pendingTasks.anyPrefs += index } - } - - /** - * Return the pending tasks list for a given executor ID, or an empty list if - * there is no map entry for that host - */ - private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = { - pendingTasks.forExecutor.getOrElse(executorId, ArrayBuffer()) - } - /** - * Return the pending tasks list for a given host, or an empty list if - * there is no map entry for that host - */ - private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { - pendingTasks.forHost.getOrElse(host, ArrayBuffer()) - } + if (tasks(index).preferredLocations == Nil) { + pendingTaskSetToAddTo.noPrefs += index + } - /** - * Return the pending rack-local task list for a given rack, or an empty list if - * there is no map entry for that rack - */ - private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = { - pendingTasks.forRack.getOrElse(rack, ArrayBuffer()) + pendingTaskSetToAddTo.anyPrefs += index } /** @@ -329,30 +276,16 @@ private[spark] class TaskSetManager( host: String, list: ArrayBuffer[Int], speculative: Boolean = false): Option[Int] = { - if (speculative) { - if (!list.isEmpty) { - for (index <- list) { - if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && - !hasAttemptOnHost(index, host)) { - // This should almost always be list.trimEnd(1) to remove tail - list -= index - if (!successful(index)) { - return Some(index) - } - } - } - } - } else { - var indexOffset = list.size - while (indexOffset > 0) { - indexOffset -= 1 - val index = list(indexOffset) - if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { - // This should almost always be list.trimEnd(1) to remove tail - list.remove(indexOffset) - if (copiesRunning(index) == 0 && !successful(index)) { - return Some(index) - } + var indexOffset = list.size + while (indexOffset > 0) { + indexOffset -= 1 + val index = list(indexOffset) + if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && + !(speculative && hasAttemptOnHost(index, host))) { + // This should almost always be list.trimEnd(1) to remove tail + list.remove(indexOffset) + if ((copiesRunning(index) == 0 || speculative) && !successful(index)) { + return Some(index) } } } @@ -371,63 +304,6 @@ private[spark] class TaskSetManager( } } - /** - * Return a speculative task for a given executor if any are available. The task should not have - * an attempt running on this host, in case the host is slow. In addition, the task should meet - * the given locality constraint. - */ - // Labeled as protected to allow tests to override providing speculative tasks if necessary - protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value)] = - { - // Check for process-local tasks; note that tasks can be process-local - // on multiple nodes when we replicate cached blocks, as in Spark Streaming - for (index <- dequeueTaskFromList( - execId, host, pendingSpeculatableTasks.forExecutor.getOrElse(execId, ArrayBuffer()), - speculative = true)) { - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - - // Check for node-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { - for (index <- dequeueTaskFromList( - execId, host, pendingSpeculatableTasks.forHost.getOrElse(host, ArrayBuffer()), - speculative = true)) { - return Some((index, TaskLocality.NODE_LOCAL)) - } - } - - // Check for no-preference tasks - if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) { - for (index <- dequeueTaskFromList( - execId, host, pendingSpeculatableTasks.noPrefs, speculative = true)) { - return Some((index, TaskLocality.PROCESS_LOCAL)) - } - } - - // Check for rack-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { - for { - rack <- sched.getRackForHost(host) - index <- dequeueTaskFromList( - execId, host, pendingSpeculatableTasks.forRack.getOrElse(rack, ArrayBuffer()), - speculative = true) - } { - return Some((index, TaskLocality.RACK_LOCAL)) - } - } - - // Check for non-local tasks - if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { - for (index <- dequeueTaskFromList(execId, host, pendingSpeculatableTasks.anyPrefs, - speculative = true)) { - return Some((index, TaskLocality.ANY)) - } - } - - None - } - /** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. @@ -435,43 +311,56 @@ private[spark] class TaskSetManager( * @return An option containing (task index within the task set, locality, is speculative?) */ private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value, Boolean)] = - { - for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) + : Option[(Int, TaskLocality.Value, Boolean)] = { + // if we didn't schedule a regular task, try to schedule a speculative one + dequeueTaskHelper(execId, host, maxLocality, false).orElse(dequeueTaskHelper(execId, host, maxLocality, true)) + } + + private def dequeueTaskHelper( + execId: String, + host: String, + maxLocality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + if (speculative && speculatableTasks.isEmpty) { + return None + } + val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks + def dequeue(list: ArrayBuffer[Int]): Option[Int] = { + dequeueTaskFromList(execId, host, list, speculative) + } + + dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index => + return Some((index, TaskLocality.PROCESS_LOCAL, speculative)) } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { - for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) { - return Some((index, TaskLocality.NODE_LOCAL, false)) + dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index => + return Some((index, TaskLocality.NODE_LOCAL, speculative)) } } + // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { - // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic - for (index <- dequeueTaskFromList(execId, host, pendingTasks.noPrefs)) { - return Some((index, TaskLocality.PROCESS_LOCAL, false)) + dequeue(pendingTaskSetToUse.noPrefs).foreach { index => + return Some((index, TaskLocality.NO_PREF, speculative)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) - index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack)) + index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer())) } { - return Some((index, TaskLocality.RACK_LOCAL, false)) + return Some((index, TaskLocality.RACK_LOCAL, speculative)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { - for (index <- dequeueTaskFromList(execId, host, pendingTasks.anyPrefs)) { - return Some((index, TaskLocality.ANY, false)) + dequeue(pendingTaskSetToUse.anyPrefs).foreach { index => + return Some((index, TaskLocality.ANY, speculative)) } } - - // find a speculative task if all others tasks have been scheduled - dequeueSpeculativeTask(execId, host, maxLocality).map { - case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} + None } /** @@ -1084,7 +973,7 @@ private[spark] class TaskSetManager( val info = taskInfos(tid) val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && - !speculatableTasks.contains(index)) { + !speculatableTasks.contains(index)) { addPendingTask(index, speculative = true) logInfo( "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" @@ -1159,9 +1048,11 @@ private[spark] object TaskSetManager { val TASK_SIZE_TO_WARN_KIB = 1000 } -case class PendingTasksByLocality( - forExecutor: HashMap[String, ArrayBuffer[Int]], - forHost: HashMap[String, ArrayBuffer[Int]], - noPrefs: ArrayBuffer[Int], - forRack: HashMap[String, ArrayBuffer[Int]], - anyPrefs: ArrayBuffer[Int]) +private[scheduler] class PendingTasksByLocality { + + val forExecutor: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]] + val forHost: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]] + val noPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int] + val forRack: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]] + val anyPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int] +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index f582ef54dd1f..275e681257d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -107,7 +107,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet] new TaskSetManager(mockTaskScheduler, taskSet, 4) { private var hasDequeuedSpeculatedTask = false - override def dequeueSpeculativeTask(execId: String, + def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = { if (hasDequeuedSpeculatedTask) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 43aaeccfc167..aeaa0126da74 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1168,7 +1168,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Because the SchedulerBackend was a mock, the 2nd copy of the task won't actually be // killed, so the FakeTaskScheduler is only told about the successful completion // of the speculated task. - assert(sched.endedTasks(3) === Success) + assert(sched.endedTasks(4) === Success) // also because the scheduler is a mock, our manager isn't notified about the task killed event, // so we do that manually manager.handleFailedTask(origTask.taskId, TaskState.KILLED, TaskKilled("test")) @@ -1706,13 +1706,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val taskOption6 = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption5.isDefined) val task5 = taskOption5.get - assert(task5.index === 2) + assert(task5.index === 3) assert(task5.taskId === 4) assert(task5.executorId === "exec1") assert(task5.attemptNumber === 1) assert(taskOption6.isDefined) val task6 = taskOption6.get - assert(task6.index === 3) + assert(task6.index === 2) assert(task6.taskId === 5) assert(task6.executorId === "exec1") assert(task6.attemptNumber === 1) From 025e54840e69c030cb153d42aab316a942360cff Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 8 Jul 2019 11:29:56 -0500 Subject: [PATCH 05/13] [SPARK-26755] : Fixing Scalastyle test failures --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b2254d5dd736..b28b7d9da21a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -313,7 +313,8 @@ private[spark] class TaskSetManager( private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { // if we didn't schedule a regular task, try to schedule a speculative one - dequeueTaskHelper(execId, host, maxLocality, false).orElse(dequeueTaskHelper(execId, host, maxLocality, true)) + dequeueTaskHelper(execId, host, maxLocality, false).orElse( + dequeueTaskHelper(execId, host, maxLocality, true)) } private def dequeueTaskHelper( From b58c34f7ed7c0feb612818560a156a41823ab566 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 9 Jul 2019 14:21:13 -0500 Subject: [PATCH 06/13] [SPARK-26755] : Addressing Reviews July 9, 2019 Modifying test, adding extra condition to not let more than two speculative tasks being spawned --- .../spark/scheduler/TaskSetManager.scala | 16 +++++------ .../spark/scheduler/TaskSetManagerSuite.scala | 27 ++++++------------- 2 files changed, 16 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b28b7d9da21a..4be44ef770c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -229,8 +229,6 @@ private[spark] class TaskSetManager( resolveRacks: Boolean = true, speculative: Boolean = false): Unit = { val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks - // ... mostly the original code from `addPendingTask` here, just adding - // into pendingTaskSetToAddTo for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => @@ -284,7 +282,8 @@ private[spark] class TaskSetManager( !(speculative && hasAttemptOnHost(index, host))) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) - if ((copiesRunning(index) == 0 || speculative) && !successful(index)) { + if (((copiesRunning(index) < 2 && speculative) || + (!speculative && copiesRunning(index) == 0)) && !successful(index)) { return Some(index) } } @@ -312,16 +311,17 @@ private[spark] class TaskSetManager( */ private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { - // if we didn't schedule a regular task, try to schedule a speculative one + // Tries to schedule a regular task first; if it returns None, then schedules + // a speculative task dequeueTaskHelper(execId, host, maxLocality, false).orElse( dequeueTaskHelper(execId, host, maxLocality, true)) } private def dequeueTaskHelper( - execId: String, - host: String, - maxLocality: TaskLocality.Value, - speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + execId: String, + host: String, + maxLocality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { if (speculative && speculatableTasks.isEmpty) { return None } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index aeaa0126da74..f3f9c6af557e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1695,11 +1695,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg clock.advance(1) assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(2, 3)) - assert(manager.pendingSpeculatableTasks.forExecutor.size === 0) - assert(manager.pendingSpeculatableTasks.forHost.size === 0) - assert(manager.pendingSpeculatableTasks.forRack.size === 0) - assert(manager.pendingSpeculatableTasks.anyPrefs.size === 2) - assert(manager.pendingSpeculatableTasks.noPrefs.size === 2) + assert(manager.copiesRunning(2) === 1) + assert(manager.copiesRunning(3) === 1) // Offer resource to start the speculative attempt for the running task val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) @@ -1716,22 +1713,14 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(task6.taskId === 5) assert(task6.executorId === "exec1") assert(task6.attemptNumber === 1) - sched.initialize(new FakeSchedulerBackend() { - override def killTask( - taskId: Long, - executorId: String, - interruptThread: Boolean, - reason: String): Unit = {} - }) clock.advance(1) // Running checkSpeculatableTasks again should return false assert(!manager.checkSpeculatableTasks(0)) - assert(manager.pendingSpeculatableTasks.forExecutor.size === 0) - assert(manager.pendingSpeculatableTasks.forHost.size === 0) - assert(manager.pendingSpeculatableTasks.forRack.size === 0) - // allPendingSpeculativeTasks will still have two pending tasks but - // pendingSpeculatableTasksWithNoPrefs should have none - assert(manager.pendingSpeculatableTasks.anyPrefs.size === 2) - assert(manager.pendingSpeculatableTasks.noPrefs.size === 0) + assert(manager.copiesRunning(2) === 2) + assert(manager.copiesRunning(3) === 2) + // Offering additional resources should not lead to any speculative tasks being respawned + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) } } From 62a15d7eb17b21378448e6ab2277434285e75ba1 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 12 Jul 2019 14:26:49 -0500 Subject: [PATCH 07/13] [SPARK-26755] : Addressing Reviews July 12, 2019 Refactoring comments, changing NO_PREF to PROCESS_LOCAL in code etc. --- .../spark/scheduler/TaskSetManager.scala | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4be44ef770c9..4337c78cc727 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -147,7 +147,9 @@ private[spark] class TaskSetManager( private[scheduler] val pendingTasks = new PendingTasksByLocality() - // The HashSet here ensures that we do not add duplicate speculative tasks + // Tasks that can be speculated. Since these will be a small fraction of total + // tasks, we'll just hold them in a HashSet. The HashSet here ensures that we do not add + // duplicate speculative tasks. private[scheduler] val speculatableTasks = new HashSet[Int] // Set of pending tasks marked as speculative for various levels of locality: executor, host, @@ -260,7 +262,7 @@ private[spark] class TaskSetManager( pendingTaskSetToAddTo.noPrefs += index } - pendingTaskSetToAddTo.anyPrefs += index + pendingTaskSetToAddTo.all += index } /** @@ -282,9 +284,12 @@ private[spark] class TaskSetManager( !(speculative && hasAttemptOnHost(index, host))) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) - if (((copiesRunning(index) < 2 && speculative) || - (!speculative && copiesRunning(index) == 0)) && !successful(index)) { - return Some(index) + if (!successful(index)) { + if (copiesRunning(index) == 0) { + return Some(index) + } else if (speculative && copiesRunning(index) == 1) { + return Some(index) + } } } } @@ -343,7 +348,7 @@ private[spark] class TaskSetManager( // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { dequeue(pendingTaskSetToUse.noPrefs).foreach { index => - return Some((index, TaskLocality.NO_PREF, speculative)) + return Some((index, TaskLocality.PROCESS_LOCAL, speculative)) } } @@ -357,7 +362,7 @@ private[spark] class TaskSetManager( } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { - dequeue(pendingTaskSetToUse.anyPrefs).foreach { index => + dequeue(pendingTaskSetToUse.all).foreach { index => return Some((index, TaskLocality.ANY, speculative)) } } @@ -596,13 +601,13 @@ private[spark] class TaskSetManager( // from each list, we may need to go deeper in the list. We poll from the end because // failed tasks are put back at the end of allPendingTasks, so we're more likely to find // an unschedulable task this way. - val indexOffset = pendingTasks.anyPrefs.lastIndexWhere { indexInTaskSet => + val indexOffset = pendingTasks.all.lastIndexWhere { indexInTaskSet => copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet) } if (indexOffset == -1) { None } else { - Some(pendingTasks.anyPrefs(indexOffset)) + Some(pendingTasks.all(indexOffset)) } } @@ -1051,9 +1056,14 @@ private[spark] object TaskSetManager { private[scheduler] class PendingTasksByLocality { - val forExecutor: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]] - val forHost: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]] - val noPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int] - val forRack: HashMap[String, ArrayBuffer[Int]] = new HashMap[String, ArrayBuffer[Int]] - val anyPrefs: ArrayBuffer[Int] = new ArrayBuffer[Int] + // Set of pending tasks for each executor. + val forExecutor = new HashMap[String, ArrayBuffer[Int]] + // Set of pending tasks for each host. Similar to pendingTasksForExecutor, but at host level. + val forHost = new HashMap[String, ArrayBuffer[Int]] + // Set containing pending tasks with no locality preferences. + val noPrefs = new ArrayBuffer[Int] + // Set of pending tasks for each rack -- similar to the above. + val forRack = new HashMap[String, ArrayBuffer[Int]] + // Set containing all pending tasks (also used as a stack, as above). + val all = new ArrayBuffer[Int] } From 466849e3ebf8e39f994423947f21bcf390c26894 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 16 Jul 2019 16:18:57 -0500 Subject: [PATCH 08/13] [SPARK-26755] : Addressing Reviews July 16, 2019 Adding a unit test to check for locality prefs for speculatable tasks and other changes --- .../spark/scheduler/TaskSetManager.scala | 52 +++++----- .../spark/scheduler/TaskSetManagerSuite.scala | 97 +++++++++++++------ 2 files changed, 97 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4337c78cc727..94fa7dde09b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -131,29 +131,15 @@ private[spark] class TaskSetManager( // same time for a barrier stage. private[scheduler] def isBarrier = taskSet.tasks.nonEmpty && taskSet.tasks(0).isBarrier - // Set of pending tasks for various levels of locality: executor, host, rack, - // noPrefs and anyPrefs. These collections are actually - // treated as stacks, in which new tasks are added to the end of the - // ArrayBuffer and removed from the end. This makes it faster to detect - // tasks that repeatedly fail because whenever a task failed, it is put - // back at the head of the stack. These collections may contain duplicates - // for two reasons: - // (1): Tasks are only removed lazily; when a task is launched, it remains - // in all the pending lists except the one that it was launched from. - // (2): Tasks may be re-added to these lists multiple times as a result - // of failures. - // Duplicates are handled in dequeueTaskFromList, which ensures that a - // task hasn't already started running before launching it. - + // Store tasks waiting to be scheduled by locality preferences private[scheduler] val pendingTasks = new PendingTasksByLocality() // Tasks that can be speculated. Since these will be a small fraction of total // tasks, we'll just hold them in a HashSet. The HashSet here ensures that we do not add - // duplicate speculative tasks. + // duplicate speculatable tasks. private[scheduler] val speculatableTasks = new HashSet[Int] - // Set of pending tasks marked as speculative for various levels of locality: executor, host, - // rack, noPrefs and anyPrefs + // Store speculatable tasks by locality preferences private[scheduler] val pendingSpeculatableTasks = new PendingTasksByLocality() // Task index, start and finish time for each task attempt (indexed by task ID) @@ -229,8 +215,8 @@ private[spark] class TaskSetManager( private[spark] def addPendingTask( index: Int, resolveRacks: Boolean = true, - speculative: Boolean = false): Unit = { - val pendingTaskSetToAddTo = if (speculative) pendingSpeculatableTasks else pendingTasks + speculatable: Boolean = false): Unit = { + val pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasks for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => @@ -281,7 +267,7 @@ private[spark] class TaskSetManager( indexOffset -= 1 val index = list(indexOffset) if (!isTaskBlacklistedOnExecOrNode(index, execId, host) && - !(speculative && hasAttemptOnHost(index, host))) { + !(speculative && hasAttemptOnHost(index, host))) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) if (!successful(index)) { @@ -314,8 +300,10 @@ private[spark] class TaskSetManager( * * @return An option containing (task index within the task set, locality, is speculative?) */ - private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) - : Option[(Int, TaskLocality.Value, Boolean)] = { + private def dequeueTask( + execId: String, + host: String, + maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = { // Tries to schedule a regular task first; if it returns None, then schedules // a speculative task dequeueTaskHelper(execId, host, maxLocality, false).orElse( @@ -980,10 +968,11 @@ private[spark] class TaskSetManager( val index = info.index if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold && !speculatableTasks.contains(index)) { - addPendingTask(index, speculative = true) + addPendingTask(index, speculatable = true) logInfo( - "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" - .format(index, taskSet.id, info.host, threshold)) + ("Marking task %d in stage %s (on %s) as speculatable because it ran more" + + " than %.0f ms(%d speculatable tasks in this taskset now)") + .format(index, taskSet.id, info.host, threshold, speculatableTasks.size + 1)) speculatableTasks += index sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) foundTasks = true @@ -1054,6 +1043,19 @@ private[spark] object TaskSetManager { val TASK_SIZE_TO_WARN_KIB = 1000 } +// Set of pending tasks for various levels of locality: executor, host, rack, +// noPrefs and anyPrefs. These collections are actually +// treated as stacks, in which new tasks are added to the end of the +// ArrayBuffer and removed from the end. This makes it faster to detect +// tasks that repeatedly fail because whenever a task failed, it is put +// back at the head of the stack. These collections may contain duplicates +// for two reasons: +// (1): Tasks are only removed lazily; when a task is launched, it remains +// in all the pending lists except the one that it was launched from. +// (2): Tasks may be re-added to these lists multiple times as a result +// of failures. +// Duplicates are handled in dequeueTaskFromList, which ensures that a +// task hasn't already started running before launching it. private[scheduler] class PendingTasksByLocality { // Set of pending tasks for each executor. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f3f9c6af557e..f41ac69b7897 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -740,7 +740,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. manager.speculatableTasks += singleTask.partitionId - manager.addPendingTask(singleTask.partitionId, speculative = true) + manager.addPendingTask(singleTask.partitionId, speculatable = true) val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get assert(manager.runningTasks === 2) @@ -886,7 +886,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) manager.speculatableTasks += 1 - manager.addPendingTask(1, speculative = true) + manager.addPendingTask(1, speculatable = true) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) @@ -1671,16 +1671,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } - // Offer resources for 4 tasks to start - for ((k, v) <- List( - "exec1" -> "host1", - "exec1" -> "host1", - "exec2" -> "host2", - "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) - assert(taskOption.isDefined) - val task = taskOption.get - assert(task.executorId === k) + // Offer resources for 4 tasks to start, 2 on each exec + Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => + (0 until 2).foreach { _ => + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } } assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) clock.advance(1) @@ -1698,21 +1695,23 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.copiesRunning(2) === 1) assert(manager.copiesRunning(3) === 1) - // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) - val taskOption6 = manager.resourceOffer("exec1", "host1", NO_PREF) - assert(taskOption5.isDefined) - val task5 = taskOption5.get - assert(task5.index === 3) - assert(task5.taskId === 4) - assert(task5.executorId === "exec1") - assert(task5.attemptNumber === 1) - assert(taskOption6.isDefined) - val task6 = taskOption6.get - assert(task6.index === 2) - assert(task6.taskId === 5) - assert(task6.executorId === "exec1") - assert(task6.attemptNumber === 1) + // Offer resource to start the speculative attempt for the running task. We offer more + // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra + // copy per speculatable task + val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption3 = manager.resourceOffer("exec1", "host1", NO_PREF) + assert(taskOption2.isDefined) + val task2 = taskOption2.get + assert(task2.index === 3) + assert(task2.taskId === 4) + assert(task2.executorId === "exec1") + assert(task2.attemptNumber === 1) + assert(taskOption3.isDefined) + val task3 = taskOption3.get + assert(task3.index === 2) + assert(task3.taskId === 5) + assert(task3.executorId === "exec1") + assert(task3.attemptNumber === 1) clock.advance(1) // Running checkSpeculatableTasks again should return false assert(!manager.checkSpeculatableTasks(0)) @@ -1723,4 +1722,48 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) } + + test("SPARK-26755 Ensure that a speculative task obeys the original locality preferences") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4")) + // Create 3 tasks with locality preferences + val taskSet = FakeTask.createTaskSet(3, + Seq(TaskLocation("host1"), TaskLocation("host3")), + Seq(TaskLocation("host2")), + Seq(TaskLocation("host3"))) + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums + } + // Offer resources for 3 tasks to start + Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => + val taskOption = manager.resourceOffer(exec, host, NO_PREF) + assert(taskOption.isDefined) + assert(taskOption.get.executorId === exec) + } + assert(sched.startedTasks.toSet === Set(0, 1, 2)) + clock.advance(1) + // Finish one task and mark the others as speculatable + manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2))) + assert(sched.endedTasks(2) === Success) + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(0, 1)) + // Ensure that the speculatable tasks obey the original locality preferences + assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) + assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) + // Since, all speculatable tasks have been launched, making another offer + // should not schedule any more tasks + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + } } From 7b23ef37d0b38b6ee0623d894e5a9b5dbb2d4049 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 17 Jul 2019 10:22:25 -0500 Subject: [PATCH 09/13] [SPARK-26755] : Addressing Reviews July 17, 2019 Modifying comment in Scaladoc style and combining two unit tests into one --- .../spark/scheduler/TaskSetManager.scala | 28 +++++---- .../spark/scheduler/TaskSetManagerSuite.scala | 60 +++++++++---------- 2 files changed, 43 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 94fa7dde09b7..75779ce42df4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1043,19 +1043,21 @@ private[spark] object TaskSetManager { val TASK_SIZE_TO_WARN_KIB = 1000 } -// Set of pending tasks for various levels of locality: executor, host, rack, -// noPrefs and anyPrefs. These collections are actually -// treated as stacks, in which new tasks are added to the end of the -// ArrayBuffer and removed from the end. This makes it faster to detect -// tasks that repeatedly fail because whenever a task failed, it is put -// back at the head of the stack. These collections may contain duplicates -// for two reasons: -// (1): Tasks are only removed lazily; when a task is launched, it remains -// in all the pending lists except the one that it was launched from. -// (2): Tasks may be re-added to these lists multiple times as a result -// of failures. -// Duplicates are handled in dequeueTaskFromList, which ensures that a -// task hasn't already started running before launching it. +/** + * Set of pending tasks for various levels of locality: executor, host, rack, + * noPrefs and anyPrefs. These collections are actually + * treated as stacks, in which new tasks are added to the end of the + * ArrayBuffer and removed from the end. This makes it faster to detect + * tasks that repeatedly fail because whenever a task failed, it is put + * back at the head of the stack. These collections may contain duplicates + * for two reasons: + * (1): Tasks are only removed lazily; when a task is launched, it remains + * in all the pending lists except the one that it was launched from. + * (2): Tasks may be re-added to these lists multiple times as a result + * of failures. + * Duplicates are handled in dequeueTaskFromList, which ensures that a + * task hasn't already started running before launching it. + */ private[scheduler] class PendingTasksByLocality { // Set of pending tasks for each executor. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f41ac69b7897..6059d774988e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1658,7 +1658,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(availableResources(GPU) sameElements Array("0", "1", "2", "3")) } - test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { + test("SPARK-26755 Ensure that a speculative task is submitted only once for execution and" + + " must also obey original locality preferences") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) @@ -1682,7 +1683,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) clock.advance(1) // Complete the first 2 tasks and leave the other 2 tasks in running - for (id <- Set(0, 1)) { + for (id <- Set(0, 2)) { manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) assert(sched.endedTasks(id) === Success) } @@ -1691,79 +1692,74 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // > 0ms, so advance the clock by 1ms here. clock.advance(1) assert(manager.checkSpeculatableTasks(0)) - assert(sched.speculativeTasks.toSet === Set(2, 3)) - assert(manager.copiesRunning(2) === 1) + assert(sched.speculativeTasks.toSet === Set(1, 3)) + assert(manager.copiesRunning(1) === 1) assert(manager.copiesRunning(3) === 1) // Offer resource to start the speculative attempt for the running task. We offer more // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra // copy per speculatable task val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) - val taskOption3 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF) assert(taskOption2.isDefined) val task2 = taskOption2.get + // Ensure that task index 3 is launched on host1 and task index 4 on host2 assert(task2.index === 3) assert(task2.taskId === 4) assert(task2.executorId === "exec1") assert(task2.attemptNumber === 1) assert(taskOption3.isDefined) val task3 = taskOption3.get - assert(task3.index === 2) + assert(task3.index === 1) assert(task3.taskId === 5) - assert(task3.executorId === "exec1") + assert(task3.executorId === "exec2") assert(task3.attemptNumber === 1) clock.advance(1) // Running checkSpeculatableTasks again should return false assert(!manager.checkSpeculatableTasks(0)) - assert(manager.copiesRunning(2) === 2) + assert(manager.copiesRunning(1) === 2) assert(manager.copiesRunning(3) === 2) // Offering additional resources should not lead to any speculative tasks being respawned assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) - } - test("SPARK-26755 Ensure that a speculative task obeys the original locality preferences") { - sc = new SparkContext("local", "test") + // Launch a new set of tasks with locality preferences sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4")) - // Create 3 tasks with locality preferences - val taskSet = FakeTask.createTaskSet(3, + val taskSet2 = FakeTask.createTaskSet(3, Seq(TaskLocation("host1"), TaskLocation("host3")), Seq(TaskLocation("host2")), Seq(TaskLocation("host3"))) - // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) - sc.conf.set(config.SPECULATION_ENABLED, true) - sc.conf.set(config.SPECULATION_QUANTILE, 0.5) - val clock = new ManualClock() - val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + val clock2 = new ManualClock() + val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = clock2) + val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = taskSet2.tasks.map { task => task.metrics.internalAccums } + // Offer resources for 3 tasks to start Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager2.resourceOffer(exec, host, NO_PREF) assert(taskOption.isDefined) assert(taskOption.get.executorId === exec) } assert(sched.startedTasks.toSet === Set(0, 1, 2)) - clock.advance(1) + clock2.advance(1) // Finish one task and mark the others as speculatable - manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask(2))) + manager2.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask2(2))) assert(sched.endedTasks(2) === Success) - clock.advance(1) - assert(manager.checkSpeculatableTasks(0)) + clock2.advance(1) + assert(manager2.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(0, 1)) // Ensure that the speculatable tasks obey the original locality preferences - assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) - assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) + assert(manager2.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + assert(manager2.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) + assert(manager2.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) + assert(manager2.resourceOffer("exec4", "host4", ANY).isDefined) // Since, all speculatable tasks have been launched, making another offer // should not schedule any more tasks - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) - assert(!manager.checkSpeculatableTasks(0)) - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager2.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(!manager2.checkSpeculatableTasks(0)) + assert(manager2.resourceOffer("exec1", "host1", ANY).isEmpty) } } From 7b23b377b9f4c88f4bbb440f70f93b7fec776243 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 17 Jul 2019 14:16:15 -0500 Subject: [PATCH 10/13] [SPARK-26755] : Separating unit tests and adding comment --- .../spark/scheduler/TaskSetManagerSuite.scala | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 6059d774988e..4bc8ee450e92 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1658,8 +1658,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(availableResources(GPU) sameElements Array("0", "1", "2", "3")) } - test("SPARK-26755 Ensure that a speculative task is submitted only once for execution and" + - " must also obey original locality preferences") { + test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") { sc = new SparkContext("local", "test") sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) @@ -1723,43 +1722,51 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + } + test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") { + sc = new SparkContext("local", "test") + // Set the speculation multiplier to be 0 so speculative tasks are launched immediately + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) // Launch a new set of tasks with locality preferences sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"), ("exec4", "host4")) - val taskSet2 = FakeTask.createTaskSet(3, + val taskSet = FakeTask.createTaskSet(3, Seq(TaskLocation("host1"), TaskLocation("host3")), Seq(TaskLocation("host2")), Seq(TaskLocation("host3"))) - val clock2 = new ManualClock() - val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = clock2) - val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = taskSet2.tasks.map { task => + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) + val accumUpdatesByTask2: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => task.metrics.internalAccums } - // Offer resources for 3 tasks to start Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => - val taskOption = manager2.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF) assert(taskOption.isDefined) assert(taskOption.get.executorId === exec) } assert(sched.startedTasks.toSet === Set(0, 1, 2)) - clock2.advance(1) + clock.advance(1) // Finish one task and mark the others as speculatable - manager2.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask2(2))) + manager.handleSuccessfulTask(2, createTaskResult(2, accumUpdatesByTask2(2))) assert(sched.endedTasks(2) === Success) - clock2.advance(1) - assert(manager2.checkSpeculatableTasks(0)) + clock.advance(1) + assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(0, 1)) // Ensure that the speculatable tasks obey the original locality preferences - assert(manager2.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) - assert(manager2.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) - assert(manager2.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) - assert(manager2.resourceOffer("exec4", "host4", ANY).isDefined) + assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + // task 1 does have a node-local preference for host2 -- but we've already got a regular + // task running there, so we should not schedule a speculative there as well. + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) + assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) // Since, all speculatable tasks have been launched, making another offer // should not schedule any more tasks - assert(manager2.resourceOffer("exec1", "host1", ANY).isEmpty) - assert(!manager2.checkSpeculatableTasks(0)) - assert(manager2.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(!manager.checkSpeculatableTasks(0)) + assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) } } From 32fa4d20587d2ca70422f3e4b50bcec68c372156 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 19 Jul 2019 13:55:07 -0500 Subject: [PATCH 11/13] [SPARK-26755] : Remove task index from speculatableTasks HashSet when a speculatable task is dequeued --- .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 75779ce42df4..c8ce14167eba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -274,6 +274,7 @@ private[spark] class TaskSetManager( if (copiesRunning(index) == 0) { return Some(index) } else if (speculative && copiesRunning(index) == 1) { + speculatableTasks -= index return Some(index) } } From 685bbae298eb84de4282b690fd410d8404e6c424 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 22 Jul 2019 10:31:56 -0500 Subject: [PATCH 12/13] [SPARK-26755] : Addressing Reviews July 22, 2019 Removing task indes from speculatableTasks after task is dequeued, adding comment --- .../org/apache/spark/scheduler/TaskSetManager.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c8ce14167eba..4996bd54bbe3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -270,11 +270,12 @@ private[spark] class TaskSetManager( !(speculative && hasAttemptOnHost(index, host))) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) + // Speculatable task should only be launched when at most one copy of the + // original task is running if (!successful(index)) { if (copiesRunning(index) == 0) { return Some(index) } else if (speculative && copiesRunning(index) == 1) { - speculatableTasks -= index return Some(index) } } @@ -321,7 +322,11 @@ private[spark] class TaskSetManager( } val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks def dequeue(list: ArrayBuffer[Int]): Option[Int] = { - dequeueTaskFromList(execId, host, list, speculative) + val task = dequeueTaskFromList(execId, host, list, speculative) + if (speculative && task.isDefined) { + speculatableTasks -= task.get + } + task } dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index => From 7a8c9926137ea527ec9d0bf1ad170dadf5e92c0d Mon Sep 17 00:00:00 2001 From: pgandhi Date: Mon, 29 Jul 2019 15:43:18 -0500 Subject: [PATCH 13/13] [SPARK-26755] : Addressing Reviews July 29, 2019 Fixing OutputCommitCoordinatorSuite Test to override dequeueTaskHelper method. --- .../org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../scheduler/OutputCommitCoordinatorSuite.scala | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4996bd54bbe3..79a1afcad716 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -312,7 +312,7 @@ private[spark] class TaskSetManager( dequeueTaskHelper(execId, host, maxLocality, true)) } - private def dequeueTaskHelper( + protected def dequeueTaskHelper( execId: String, host: String, maxLocality: TaskLocality.Value, diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 275e681257d6..d6964063c118 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -107,14 +107,18 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet] new TaskSetManager(mockTaskScheduler, taskSet, 4) { private var hasDequeuedSpeculatedTask = false - def dequeueSpeculativeTask(execId: String, + override def dequeueTaskHelper( + execId: String, host: String, - locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = { - if (hasDequeuedSpeculatedTask) { + locality: TaskLocality.Value, + speculative: Boolean): Option[(Int, TaskLocality.Value, Boolean)] = { + if (!speculative) { + super.dequeueTaskHelper(execId, host, locality, speculative) + } else if (hasDequeuedSpeculatedTask) { None } else { hasDequeuedSpeculatedTask = true - Some((0, TaskLocality.PROCESS_LOCAL)) + Some((0, TaskLocality.PROCESS_LOCAL, true)) } } }