Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,20 @@ private[spark] class ExecutorAllocationManager(
// All known executors
private val executorIds = new mutable.HashSet[String]

// Keys: executor ids for live executors that have been blacklisted for the whole application
// either directly or indirectly through node blacklisting
// Values: whether the executor has been replaced by another executor requested asked to the
// cluster manager, or this has not yet happend. Replacement happens once and eventually, when
// SparkListenerExecutorAdded is received that is assumed as a replacement for an arbitrary
// non already replaced executor in isBlacklistedExecutorReplaced
private val isBlacklistedExecutorReplaced = new mutable.HashMap[String, Boolean]
// Keys: info for tasks that were not able to find a slot in any executor at some point in
// time due to blacklisting, either due application wide blacklisting or TaskSet level
// blacklisting
// Values: epoch for the first time this task could not be scheduled due to blacklisting
private val unscheduleableTasks = new mutable.HashMap[UnscheduleableTaskInfo, Long]
private val unscheduleableTasksAgeThresholdMillis = 5 * 60 * 1000 // FIXME configurable

// A timestamp of when an addition should be triggered, or NOT_SET if it is not set
// This is set when pending tasks are added but not scheduled yet
private var addTime: Long = NOT_SET
Expand Down Expand Up @@ -205,6 +219,10 @@ private[spark] class ExecutorAllocationManager(
}
}

def registerUnscheduleableTask(taskId: UnscheduleableTaskInfo): Unit = synchronized {
unscheduleableTasks.getOrElseUpdate(taskId, clock.getTimeMillis)
}

/**
* Use a different clock for this allocation manager. This is mainly used for testing.
*/
Expand Down Expand Up @@ -263,8 +281,16 @@ private[spark] class ExecutorAllocationManager(
* and pending tasks, rounded up.
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
val executorsForRunningOrPendingTasks = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
val executorsForUnscheduleableTasks = {
val numUnscheduleableTasks = unscheduleableTasks.size
if (numUnscheduleableTasks > 0) { math.max(1, numUnscheduleableTasks / tasksPerExecutor) }
else { 0 }
}
executorsForRunningOrPendingTasks + executorsForUnscheduleableTasks
}

/**
Expand Down Expand Up @@ -293,6 +319,17 @@ private[spark] class ExecutorAllocationManager(
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}

unscheduleableTasks.filter { case (unscheduleableTaskInfo, time) =>
now >= (time + unscheduleableTasksAgeThresholdMillis)
}.foreach { case (unscheduleableTaskInfo, time) =>
val abortMessage = s"Aborting TaskSet ${unscheduleableTaskInfo.stageId}." +
s"${unscheduleableTaskInfo.stageAttemptId} because task " +
s"${unscheduleableTaskInfo.index} (partition ${unscheduleableTaskInfo.partitionId}) " +
s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " +
s"can be configured via spark.blacklist.*."
// TODO actually abort
}
}

/**
Expand Down Expand Up @@ -700,6 +737,10 @@ private[spark] class ExecutorAllocationManager(
// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
allocationManager.onExecutorBusy(executorId)

allocationManager.unscheduleableTasks
.remove(UnscheduleableTaskInfo(taskStart.stageId, taskStart.stageAttemptId,
taskStart.taskInfo.index))
}
}

Expand Down Expand Up @@ -732,6 +773,10 @@ private[spark] class ExecutorAllocationManager(
stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)}
}
}

allocationManager.unscheduleableTasks
.remove(UnscheduleableTaskInfo(taskEnd.stageId, taskEnd.stageAttemptId,
taskEnd.taskInfo.index))
}
}

Expand Down Expand Up @@ -849,4 +894,22 @@ private[spark] class ExecutorAllocationManager(

private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue

object UnscheduleableTaskInfo {
// Buld a UnscheduleableTaskInfo for comparisons, as partitionId is ignored in
// equals and hashCode
def apply(stageId: Int, stageAttemptId: Int, index: Int): UnscheduleableTaskInfo =
new UnscheduleableTaskInfo(stageId, stageAttemptId, index, 0)
}
// (TaskSet.stageId, TaskSet.stageAttemptId, TaskInfo.index, Task.partitionId)
case class UnscheduleableTaskInfo(stageId: Int, stageAttemptId: Int,
index: Int, partitionId: Int) {
override def equals(that: Any): Boolean = that match {
case that: UnscheduleableTaskInfo =>
that.stageId == stageId && that.stageAttemptId == stageAttemptId &&
that.index == this.index
case _ => false
}
override def hashCode(): Int = (stageId, stageAttemptId, index).hashCode()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.math.max
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.ExecutorAllocationManager.UnscheduleableTaskInfo
import org.apache.spark.TaskState.TaskState
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.scheduler.SchedulingMode._
Expand Down Expand Up @@ -652,6 +653,7 @@ private[spark] class TaskSetManager(
}
}

val executorAllocationManagerOpt = sched.sc.executorAllocationManager
pendingTask.foreach { indexInTaskSet =>
// try to find some executor this task can run on. Its possible that some *other*
// task isn't schedulable anywhere, but we will discover that in some later call,
Expand All @@ -675,14 +677,18 @@ private[spark] class TaskSetManager(
}
if (blacklistedEverywhere) {
val partition = tasks(indexInTaskSet).partitionId
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
executorAllocationManagerOpt.fold(
abort(s"""
|Aborting $taskSet because task $indexInTaskSet (partition $partition)
|cannot run anywhere due to node and executor blacklist.
|Most recent failure:
|${taskSetBlacklist.getLatestFailureReason}
|
|Blacklisting behavior can be configured via spark.blacklist.*.
|""".stripMargin)
) { _.registerUnscheduleableTask(
UnscheduleableTaskInfo(taskSet.stageId, taskSet.stageAttemptId,
indexInTaskSet, partition)) }
}
}
}
Expand Down