diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 119b426a9af3..461c0b6444f8 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -132,6 +132,20 @@ private[spark] class ExecutorAllocationManager( // All known executors private val executorIds = new mutable.HashSet[String] + // Keys: executor ids for live executors that have been blacklisted for the whole application + // either directly or indirectly through node blacklisting + // Values: whether the executor has been replaced by another executor requested asked to the + // cluster manager, or this has not yet happend. Replacement happens once and eventually, when + // SparkListenerExecutorAdded is received that is assumed as a replacement for an arbitrary + // non already replaced executor in isBlacklistedExecutorReplaced + private val isBlacklistedExecutorReplaced = new mutable.HashMap[String, Boolean] + // Keys: info for tasks that were not able to find a slot in any executor at some point in + // time due to blacklisting, either due application wide blacklisting or TaskSet level + // blacklisting + // Values: epoch for the first time this task could not be scheduled due to blacklisting + private val unscheduleableTasks = new mutable.HashMap[UnscheduleableTaskInfo, Long] + private val unscheduleableTasksAgeThresholdMillis = 5 * 60 * 1000 // FIXME configurable + // A timestamp of when an addition should be triggered, or NOT_SET if it is not set // This is set when pending tasks are added but not scheduled yet private var addTime: Long = NOT_SET @@ -205,6 +219,10 @@ private[spark] class ExecutorAllocationManager( } } + def registerUnscheduleableTask(taskId: UnscheduleableTaskInfo): Unit = synchronized { + unscheduleableTasks.getOrElseUpdate(taskId, clock.getTimeMillis) + } + /** * Use a different clock for this allocation manager. This is mainly used for testing. */ @@ -263,8 +281,16 @@ private[spark] class ExecutorAllocationManager( * and pending tasks, rounded up. */ private def maxNumExecutorsNeeded(): Int = { - val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks - (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + val executorsForRunningOrPendingTasks = { + val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks + (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + } + val executorsForUnscheduleableTasks = { + val numUnscheduleableTasks = unscheduleableTasks.size + if (numUnscheduleableTasks > 0) { math.max(1, numUnscheduleableTasks / tasksPerExecutor) } + else { 0 } + } + executorsForRunningOrPendingTasks + executorsForUnscheduleableTasks } /** @@ -293,6 +319,17 @@ private[spark] class ExecutorAllocationManager( if (executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } + + unscheduleableTasks.filter { case (unscheduleableTaskInfo, time) => + now >= (time + unscheduleableTasksAgeThresholdMillis) + }.foreach { case (unscheduleableTaskInfo, time) => + val abortMessage = s"Aborting TaskSet ${unscheduleableTaskInfo.stageId}." + + s"${unscheduleableTaskInfo.stageAttemptId} because task " + + s"${unscheduleableTaskInfo.index} (partition ${unscheduleableTaskInfo.partitionId}) " + + s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " + + s"can be configured via spark.blacklist.*." + // TODO actually abort + } } /** @@ -700,6 +737,10 @@ private[spark] class ExecutorAllocationManager( // Mark the executor on which this task is scheduled as busy executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId allocationManager.onExecutorBusy(executorId) + + allocationManager.unscheduleableTasks + .remove(UnscheduleableTaskInfo(taskStart.stageId, taskStart.stageAttemptId, + taskStart.taskInfo.index)) } } @@ -732,6 +773,10 @@ private[spark] class ExecutorAllocationManager( stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)} } } + + allocationManager.unscheduleableTasks + .remove(UnscheduleableTaskInfo(taskEnd.stageId, taskEnd.stageAttemptId, + taskEnd.taskInfo.index)) } } @@ -849,4 +894,22 @@ private[spark] class ExecutorAllocationManager( private object ExecutorAllocationManager { val NOT_SET = Long.MaxValue + + object UnscheduleableTaskInfo { + // Buld a UnscheduleableTaskInfo for comparisons, as partitionId is ignored in + // equals and hashCode + def apply(stageId: Int, stageAttemptId: Int, index: Int): UnscheduleableTaskInfo = + new UnscheduleableTaskInfo(stageId, stageAttemptId, index, 0) + } + // (TaskSet.stageId, TaskSet.stageAttemptId, TaskInfo.index, Task.partitionId) + case class UnscheduleableTaskInfo(stageId: Int, stageAttemptId: Int, + index: Int, partitionId: Int) { + override def equals(that: Any): Boolean = that match { + case that: UnscheduleableTaskInfo => + that.stageId == stageId && that.stageAttemptId == stageAttemptId && + that.index == this.index + case _ => false + } + override def hashCode(): Int = (stageId, stageAttemptId, index).hashCode() + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index de4711f461df..53570c912d23 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -26,6 +26,7 @@ import scala.math.max import scala.util.control.NonFatal import org.apache.spark._ +import org.apache.spark.ExecutorAllocationManager.UnscheduleableTaskInfo import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.SchedulingMode._ @@ -652,6 +653,7 @@ private[spark] class TaskSetManager( } } + val executorAllocationManagerOpt = sched.sc.executorAllocationManager pendingTask.foreach { indexInTaskSet => // try to find some executor this task can run on. Its possible that some *other* // task isn't schedulable anywhere, but we will discover that in some later call, @@ -675,14 +677,18 @@ private[spark] class TaskSetManager( } if (blacklistedEverywhere) { val partition = tasks(indexInTaskSet).partitionId - abort(s""" - |Aborting $taskSet because task $indexInTaskSet (partition $partition) - |cannot run anywhere due to node and executor blacklist. - |Most recent failure: - |${taskSetBlacklist.getLatestFailureReason} - | - |Blacklisting behavior can be configured via spark.blacklist.*. - |""".stripMargin) + executorAllocationManagerOpt.fold( + abort(s""" + |Aborting $taskSet because task $indexInTaskSet (partition $partition) + |cannot run anywhere due to node and executor blacklist. + |Most recent failure: + |${taskSetBlacklist.getLatestFailureReason} + | + |Blacklisting behavior can be configured via spark.blacklist.*. + |""".stripMargin) + ) { _.registerUnscheduleableTask( + UnscheduleableTaskInfo(taskSet.stageId, taskSet.stageAttemptId, + indexInTaskSet, partition)) } } } }