diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 579e7ff320f5..c0e72b57d48b 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -162,6 +162,16 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe onEvent(speculativeTask); } + public void onUnschedulableTaskSetAdded( + SparkListenerUnschedulableTaskSetAdded unschedulableTaskSetAdded) { + onEvent(unschedulableTaskSetAdded); + } + + public void onUnschedulableTaskSetRemoved( + SparkListenerUnschedulableTaskSetRemoved unschedulableTaskSetRemoved) { + onEvent(unschedulableTaskSetRemoved); + } + @Override public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) { onEvent(event); diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 620a6fe2f9d7..85409d599cca 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager( private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = { val pending = listener.totalPendingTasksPerResourceProfile(rpId) val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId) + val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId) val running = listener.totalRunningTasksPerResourceProfile(rpId) val numRunningOrPendingTasks = pending + running val rp = resourceProfileManager.resourceProfileFromId(rpId) @@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager( s" tasksperexecutor: $tasksPerExecutor") val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio / tasksPerExecutor).toInt - if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { + + val maxNeededWithSpeculationLocalityOffset = + if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) { // If we have pending speculative tasks and only need a single executor, allocate one more // to satisfy the locality requirements of speculation maxNeeded + 1 } else { maxNeeded } + + if (unschedulableTaskSets > 0) { + // Request additional executors to account for task sets having tasks that are unschedulable + // due to blacklisting when the active executor count has already reached the max needed + // which we would normally get. + val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio / + tasksPerExecutor).toInt + math.max(maxNeededWithSpeculationLocalityOffset, + executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables) + } else { + maxNeededWithSpeculationLocalityOffset + } } private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized { @@ -622,6 +637,12 @@ private[spark] class ExecutorAllocationManager( private val resourceProfileIdToStageAttempt = new mutable.HashMap[Int, mutable.Set[StageAttempt]] + // Keep track of unschedulable task sets due to blacklisting. This is a Set of StageAttempt's + // because we'll only take the last unschedulable task in a taskset although there can be more. + // This is done in order to avoid costly loops in the scheduling. + // Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details. + private val unschedulableTaskSets = new mutable.HashSet[StageAttempt] + // stageAttempt to tuple (the number of task with locality preferences, a map where each pair // is a node and the number of tasks that would like to be scheduled on that node, and // the resource profile id) map, @@ -789,6 +810,28 @@ private[spark] class ExecutorAllocationManager( } } + override def onUnschedulableTaskSetAdded( + unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { + val stageId = unschedulableTaskSetAdded.stageId + val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) + allocationManager.synchronized { + unschedulableTaskSets.add(stageAttempt) + allocationManager.onSchedulerBacklogged() + } + } + + override def onUnschedulableTaskSetRemoved( + unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { + val stageId = unschedulableTaskSetRemoved.stageId + val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId + val stageAttempt = StageAttempt(stageId, stageAttemptId) + allocationManager.synchronized { + // Clear unschedulableTaskSets since atleast one task becomes schedulable now + unschedulableTaskSets.remove(stageAttempt) + } + } + /** * An estimate of the total number of pending tasks remaining for currently running stages. Does * not account for tasks which may have failed and been resubmitted. @@ -829,6 +872,16 @@ private[spark] class ExecutorAllocationManager( numTotalTasks - numRunning } + /** + * Currently we only know when a task set has an unschedulable task, we don't know + * the exact number and since the allocation manager isn't tied closely with the scheduler, + * we use the number of tasks sets that are unschedulable as a heuristic to add more executors. + */ + def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = { + val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq + attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size + } + def hasPendingTasks: Boolean = { hasPendingSpeculativeTasks || hasPendingRegularTasks } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index cb024d0852d0..e9fb0759c274 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -311,6 +311,26 @@ private[spark] class DAGScheduler( eventProcessLoop.post(SpeculativeTaskSubmitted(task)) } + /** + * Called by the TaskSetManager when a taskset becomes unschedulable due to blacklisting and + * dynamic allocation is enabled. + */ + def unschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int): Unit = { + eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId)) + } + + /** + * Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic + * allocation is enabled. + */ + def unschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int): Unit = { + eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId)) + } + private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times @@ -1014,6 +1034,18 @@ private[spark] class DAGScheduler( listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId)) } + private[scheduler] def handleUnschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int): Unit = { + listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId)) + } + + private[scheduler] def handleUnschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int): Unit = { + listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId)) + } + private[scheduler] def handleTaskSetFailed( taskSet: TaskSet, reason: String, @@ -2287,6 +2319,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case SpeculativeTaskSubmitted(task) => dagScheduler.handleSpeculativeTaskSubmitted(task) + case UnschedulableTaskSetAdded(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId) + + case UnschedulableTaskSetRemoved(stageId, stageAttemptId) => + dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId) + case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 78d458338e8f..d226fe88614d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -97,3 +97,11 @@ private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent private[scheduler] case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent +private[scheduler] +case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int) + extends DAGSchedulerEvent + +private[scheduler] +case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int) + extends DAGSchedulerEvent + diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 62d54f3b74a4..8119215b8b74 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -157,6 +157,16 @@ case class SparkListenerNodeBlacklisted( case class SparkListenerNodeUnblacklisted(time: Long, hostId: String) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerUnschedulableTaskSetAdded( + stageId: Int, + stageAttemptId: Int) extends SparkListenerEvent + +@DeveloperApi +case class SparkListenerUnschedulableTaskSetRemoved( + stageId: Int, + stageAttemptId: Int) extends SparkListenerEvent + @DeveloperApi case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent @@ -339,6 +349,20 @@ private[spark] trait SparkListenerInterface { */ def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit + /** + * Called when a taskset becomes unschedulable due to blacklisting and dynamic allocation + * is enabled. + */ + def onUnschedulableTaskSetAdded( + unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit + + /** + * Called when an unschedulable taskset becomes schedulable and dynamic allocation + * is enabled. + */ + def onUnschedulableTaskSetRemoved( + unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit + /** * Called when the driver receives a block update info. */ @@ -425,6 +449,12 @@ abstract class SparkListener extends SparkListenerInterface { override def onNodeUnblacklisted( nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { } + override def onUnschedulableTaskSetAdded( + unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { } + + override def onUnschedulableTaskSetRemoved( + unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { } + override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } override def onSpeculativeTaskSubmitted( diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 3d316c948db7..13e65f4291fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -79,6 +79,10 @@ private[spark] trait SparkListenerBus listener.onBlockUpdated(blockUpdated) case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) + case unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded => + listener.onUnschedulableTaskSetAdded(unschedulableTaskSetAdded) + case unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved => + listener.onUnschedulableTaskSetRemoved(unschedulableTaskSetRemoved) case resourceProfileAdded: SparkListenerResourceProfileAdded => listener.onResourceProfileAdded(resourceProfileAdded) case _ => listener.onOtherEvent(event) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 12bd93286d73..83c00dd4f248 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -637,10 +637,9 @@ private[spark] class TaskSchedulerImpl( if (!launchedAnyTask) { taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => // If the taskSet is unschedulable we try to find an existing idle blacklisted - // executor. If we cannot find one, we abort immediately. Else we kill the idle - // executor and kick off an abortTimer which if it doesn't schedule a task within the - // the timeout will abort the taskSet if we were unable to schedule any task from the - // taskSet. + // executor and kill the idle executor and kick off an abortTimer which if it doesn't + // schedule a task within the the timeout will abort the taskSet if we were unable to + // schedule any task from the taskSet. // Note 1: We keep track of schedulability on a per taskSet basis rather than on a per // task basis. // Note 2: The taskSet can still be aborted when there are more than one idle @@ -648,22 +647,33 @@ private[spark] class TaskSchedulerImpl( // idle executor isn't replaced in time by ExecutorAllocationManager as it relies on // pending tasks and doesn't kill executors on idle timeouts, resulting in the abort // timer to expire and abort the taskSet. + // + // If there are no idle executors and dynamic allocation is enabled, then we would + // notify ExecutorAllocationManager to allocate more executors to schedule the + // unschedulable tasks else we will abort immediately. executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match { case Some ((executorId, _)) => if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId)) - - val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 - unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout - logInfo(s"Waiting for $timeout ms for completely " - + s"blacklisted task to be schedulable again before aborting $taskSet.") - abortTimer.schedule( - createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex) + } + case None => + // Notify ExecutorAllocationManager about the unschedulable task set, + // in order to provision more executors to make them schedulable + if (Utils.isDynamicAllocationEnabled(conf)) { + if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) { + logInfo(s"Notifying ExecutorAllocationManager to allocate more executors to" + + s" schedule the unschedulable task before aborting $taskSet.") + dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId, + taskSet.taskSet.stageAttemptId) + updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex) + } + } else { + // Abort Immediately + logInfo("Cannot schedule any task because of complete blacklisting. No idle" + + s" executors can be found to kill. Aborting $taskSet.") + taskSet.abortSinceCompletelyBlacklisted(taskIndex) } - case None => // Abort Immediately - logInfo("Cannot schedule any task because of complete blacklisting. No idle" + - s" executors can be found to kill. Aborting $taskSet." ) - taskSet.abortSinceCompletelyBlacklisted(taskIndex) } } } else { @@ -676,6 +686,10 @@ private[spark] class TaskSchedulerImpl( if (unschedulableTaskSetToExpiryTime.nonEmpty) { logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " + "recently scheduled.") + // Notify ExecutorAllocationManager as well as other subscribers that a task now + // recently becomes schedulable + dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId, + taskSet.taskSet.stageAttemptId) unschedulableTaskSetToExpiryTime.clear() } } @@ -722,6 +736,17 @@ private[spark] class TaskSchedulerImpl( return tasks.map(_.toSeq) } + private def updateUnschedulableTaskSetTimeoutAndStartAbortTimer( + taskSet: TaskSetManager, + taskIndex: Int): Unit = { + val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000 + unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout + logInfo(s"Waiting for $timeout ms for completely " + + s"blacklisted task to be schedulable again before aborting $taskSet.") + abortTimer.schedule( + createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout) + } + private def createUnschedulableTaskSetAbortTimer( taskSet: TaskSetManager, taskIndex: Int): TimerTask = { diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 8037f4a9447d..ea6e010ef29a 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -21,15 +21,15 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable -import org.mockito.ArgumentMatchers.{any, eq => meq} -import org.mockito.Mockito.{mock, never, times, verify, when} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, ResourceProfileManager, TaskResourceRequests} +import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -501,6 +501,175 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsToAddForDefaultProfile(manager) === 1) } + test("SPARK-31418: one stage being unschedulable") { + val clock = new ManualClock() + val conf = createConf(0, 5, 0).set(config.EXECUTOR_CORES, 2) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + post(SparkListenerStageSubmitted(createStageInfo(0, 2))) + + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + + onExecutorAddedDefaultProfile(manager, "0") + val t1 = createTaskInfo(0, 0, executorId = s"0") + val t2 = createTaskInfo(1, 1, executorId = s"0") + post(SparkListenerTaskStart(0, 0, t1)) + post(SparkListenerTaskStart(0, 0, t2)) + + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + + // Stage 0 becomes unschedulable due to blacklisting + post(SparkListenerUnschedulableTaskSetAdded(0, 0)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + // Assert that we are getting additional executor to schedule unschedulable tasks + assert(numExecutorsTarget(manager, defaultProfile.id) === 2) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2) + + // Add a new executor + onExecutorAddedDefaultProfile(manager, "1") + // Now once the task becomes schedulable, clear the unschedulableTaskSets + post(SparkListenerUnschedulableTaskSetRemoved(0, 0)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + } + + test("SPARK-31418: multiple stages being unschedulable") { + val clock = new ManualClock() + val conf = createConf(0, 10, 0).set(config.EXECUTOR_CORES, 2) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + post(SparkListenerStageSubmitted(createStageInfo(0, 2))) + post(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(SparkListenerStageSubmitted(createStageInfo(2, 2))) + + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + + // Add necessary executors + (0 to 2).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString)) + + // Start all the tasks + (0 to 2).foreach { + i => + val t1Info = createTaskInfo(0, (i * 2) + 1, executorId = s"${i / 2}") + val t2Info = createTaskInfo(1, (i * 2) + 2, executorId = s"${i / 2}") + post(SparkListenerTaskStart(i, 0, t1Info)) + post(SparkListenerTaskStart(i, 0, t2Info)) + } + assert(numExecutorsTarget(manager, defaultProfile.id) === 3) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3) + + // Complete the stage 0 tasks. + val t1Info = createTaskInfo(0, 0, executorId = s"0") + val t2Info = createTaskInfo(1, 1, executorId = s"0") + post(SparkListenerTaskEnd(0, 0, null, Success, t1Info, new ExecutorMetrics, null)) + post(SparkListenerTaskEnd(0, 0, null, Success, t2Info, new ExecutorMetrics, null)) + post(SparkListenerStageCompleted(createStageInfo(0, 2))) + + // Stage 1 and 2 becomes unschedulable now due to blacklisting + post(SparkListenerUnschedulableTaskSetAdded(1, 0)) + post(SparkListenerUnschedulableTaskSetAdded(2, 0)) + + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + // Assert that we are getting additional executor to schedule unschedulable tasks + assert(numExecutorsTarget(manager, defaultProfile.id) === 4) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 4) + + // Add a new executor + onExecutorAddedDefaultProfile(manager, "3") + + // Now once the task becomes schedulable, clear the unschedulableTaskSets + post(SparkListenerUnschedulableTaskSetRemoved(1, 0)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 4) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5) + } + + test("SPARK-31418: remove executors after unschedulable tasks end") { + val clock = new ManualClock() + val stage = createStageInfo(0, 10) + val conf = createConf(0, 6, 0).set(config.EXECUTOR_CORES, 2) + val manager = createManager(conf, clock = clock) + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + post(SparkListenerStageSubmitted(stage)) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + + (0 to 4).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString)) + (0 to 9).map { i => createTaskInfo(i, i, executorId = s"${i / 2}") }.foreach { + info => post(SparkListenerTaskStart(0, 0, info)) + } + assert(numExecutorsTarget(manager, defaultProfile.id) === 5) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5) + + // 8 tasks (0 - 7) finished + (0 to 7).map { i => createTaskInfo(i, i, executorId = s"${i / 2}") }.foreach { + info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) + } + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + (0 to 3).foreach { i => assert(removeExecutorDefaultProfile(manager, i.toString)) } + (0 to 3).foreach { i => onExecutorRemoved(manager, i.toString) } + + // Now due to blacklisting, the task becomes unschedulable + post(SparkListenerUnschedulableTaskSetAdded(0, 0)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 2) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2) + + // New executor got added + onExecutorAddedDefaultProfile(manager, "5") + + // Now once the task becomes schedulable, clear the unschedulableTaskSets + post(SparkListenerUnschedulableTaskSetRemoved(0, 0)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 1) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1) + post(SparkListenerTaskEnd(0, 0, null, Success, + createTaskInfo(9, 9, "4"), new ExecutorMetrics, null)) + // Unschedulable task successfully ran on the new executor provisioned + post(SparkListenerTaskEnd(0, 0, null, Success, + createTaskInfo(8, 8, "5"), new ExecutorMetrics, null)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + post(SparkListenerStageCompleted(stage)) + clock.advance(1000) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) + assert(numExecutorsTarget(manager, defaultProfile.id) === 0) + assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 0) + assert(removeExecutorDefaultProfile(manager, "4")) + onExecutorRemoved(manager, "4") + assert(removeExecutorDefaultProfile(manager, "5")) + onExecutorRemoved(manager, "5") + } + test("SPARK-30511 remove executors when speculative tasks end") { val clock = new ManualClock() val stage = createStageInfo(0, 40) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e43be60e956b..9ca3ce9d43ca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1000,6 +1000,42 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!tsm.isZombie) } + test("SPARK-31418 abort timer should kick in when task is completely blacklisted &" + + "allocation manager could not acquire a new executor before the timeout") { + // set the abort timer to fail immediately + taskScheduler = setupSchedulerWithMockTaskSetBlacklist( + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0", + config.DYN_ALLOCATION_ENABLED.key -> "true") + + // We have 2 tasks remaining with 1 executor + val taskSet = FakeTask.createTaskSet(numTasks = 2) + taskScheduler.submitTasks(taskSet) + val tsm = stageToMockTaskSetManager(0) + + // submit an offer with one executor + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("executor0", "host0", 2))).flatten + + // Fail the running task + failTask(0, TaskState.FAILED, UnknownReason, tsm) + when(tsm.taskSetBlacklistHelperOpt.get.isExecutorBlacklistedForTask( + "executor0", 0)).thenReturn(true) + + // If the executor is busy, then dynamic allocation should kick in and try + // to acquire additional executors to schedule the blacklisted task + assert(taskScheduler.isExecutorBusy("executor0")) + + // make an offer on the blacklisted executor. We won't schedule anything, and set the abort + // timer to kick in immediately + assert(taskScheduler.resourceOffers(IndexedSeq( + WorkerOffer("executor0", "host0", 1) + )).flatten.size === 0) + // Wait for the abort timer to kick in. Even though we configure the timeout to be 0, there is a + // slight delay as the abort timer is launched in a separate thread. + eventually(timeout(500.milliseconds)) { + assert(tsm.isZombie) + } + } + /** * Helper for performance tests. Takes the explicitly blacklisted nodes and executors; verifies * that the blacklists are used efficiently to ensure scheduling is not O(numPendingTasks).