Skip to content
Closed
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/spark/SparkFirehoseListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,16 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe
onEvent(speculativeTask);
}

public void onUnschedulableTaskSetAdded(
SparkListenerUnschedulableTaskSetAdded unschedulableTaskSetAdded) {
onEvent(unschedulableTaskSetAdded);
}

public void onUnschedulableTaskSetRemoved(
SparkListenerUnschedulableTaskSetRemoved unschedulableTaskSetRemoved) {
onEvent(unschedulableTaskSetRemoved);
}

@Override
public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
onEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager(
private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
val running = listener.totalRunningTasksPerResourceProfile(rpId)
val numRunningOrPendingTasks = pending + running
val rp = resourceProfileManager.resourceProfileFromId(rpId)
Expand All @@ -289,13 +290,27 @@ private[spark] class ExecutorAllocationManager(
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutor).toInt
if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {

val maxNeededWithSpeculationLocalityOffset =
if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor, allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}

if (unschedulableTaskSets > 0) {
// Request additional executors to account for task sets having tasks that are unschedulable
// due to blacklisting when the active executor count has already reached the max needed
// which we would normally get.
val maxNeededForUnschedulables = math.ceil(unschedulableTaskSets * executorAllocationRatio /
tasksPerExecutor).toInt
math.max(maxNeededWithSpeculationLocalityOffset,
executorMonitor.executorCountWithResourceProfile(rpId) + maxNeededForUnschedulables)
} else {
maxNeededWithSpeculationLocalityOffset
}
}

private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized {
Expand Down Expand Up @@ -622,6 +637,12 @@ private[spark] class ExecutorAllocationManager(
private val resourceProfileIdToStageAttempt =
new mutable.HashMap[Int, mutable.Set[StageAttempt]]

// Keep track of unschedulable task sets due to blacklisting. This is a Set of StageAttempt's
// because we'll only take the last unschedulable task in a taskset although there can be more.
// This is done in order to avoid costly loops in the scheduling.
// Check TaskSetManager#getCompletelyBlacklistedTaskIfAny for more details.
private val unschedulableTaskSets = new mutable.HashSet[StageAttempt]

// stageAttempt to tuple (the number of task with locality preferences, a map where each pair
// is a node and the number of tasks that would like to be scheduled on that node, and
// the resource profile id) map,
Expand Down Expand Up @@ -789,6 +810,28 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onUnschedulableTaskSetAdded(
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = {
val stageId = unschedulableTaskSetAdded.stageId
val stageAttemptId = unschedulableTaskSetAdded.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
unschedulableTaskSets.add(stageAttempt)
allocationManager.onSchedulerBacklogged()
}
}

override def onUnschedulableTaskSetRemoved(
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = {
val stageId = unschedulableTaskSetRemoved.stageId
val stageAttemptId = unschedulableTaskSetRemoved.stageAttemptId
val stageAttempt = StageAttempt(stageId, stageAttemptId)
allocationManager.synchronized {
// Clear unschedulableTaskSets since atleast one task becomes schedulable now
unschedulableTaskSets.remove(stageAttempt)
}
}

/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
Expand Down Expand Up @@ -829,6 +872,16 @@ private[spark] class ExecutorAllocationManager(
numTotalTasks - numRunning
}

/**
* Currently we only know when a task set has an unschedulable task, we don't know
* the exact number and since the allocation manager isn't tied closely with the scheduler,
* we use the number of tasks sets that are unschedulable as a heuristic to add more executors.
*/
def pendingUnschedulableTaskSetsPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
}

def hasPendingTasks: Boolean = {
hasPendingSpeculativeTasks || hasPendingRegularTasks
}
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,26 @@ private[spark] class DAGScheduler(
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}

/**
* Called by the TaskSetManager when a taskset becomes unschedulable due to blacklisting and
* dynamic allocation is enabled.
*/
def unschedulableTaskSetAdded(
stageId: Int,
stageAttemptId: Int): Unit = {
eventProcessLoop.post(UnschedulableTaskSetAdded(stageId, stageAttemptId))
}

/**
* Called by the TaskSetManager when an unschedulable taskset becomes schedulable and dynamic
* allocation is enabled.
*/
def unschedulableTaskSetRemoved(
stageId: Int,
stageAttemptId: Int): Unit = {
eventProcessLoop.post(UnschedulableTaskSetRemoved(stageId, stageAttemptId))
}

private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
Expand Down Expand Up @@ -1014,6 +1034,18 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId, task.stageAttemptId))
}

private[scheduler] def handleUnschedulableTaskSetAdded(
stageId: Int,
stageAttemptId: Int): Unit = {
listenerBus.post(SparkListenerUnschedulableTaskSetAdded(stageId, stageAttemptId))
}

private[scheduler] def handleUnschedulableTaskSetRemoved(
stageId: Int,
stageAttemptId: Int): Unit = {
listenerBus.post(SparkListenerUnschedulableTaskSetRemoved(stageId, stageAttemptId))
}

private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
Expand Down Expand Up @@ -2287,6 +2319,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case SpeculativeTaskSubmitted(task) =>
dagScheduler.handleSpeculativeTaskSubmitted(task)

case UnschedulableTaskSetAdded(stageId, stageAttemptId) =>
dagScheduler.handleUnschedulableTaskSetAdded(stageId, stageAttemptId)

case UnschedulableTaskSetRemoved(stageId, stageAttemptId) =>
dagScheduler.handleUnschedulableTaskSetRemoved(stageId, stageAttemptId)

case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,11 @@ private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
private[scheduler]
case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent

private[scheduler]
case class UnschedulableTaskSetAdded(stageId: Int, stageAttemptId: Int)
extends DAGSchedulerEvent

private[scheduler]
case class UnschedulableTaskSetRemoved(stageId: Int, stageAttemptId: Int)
extends DAGSchedulerEvent

30 changes: 30 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ case class SparkListenerNodeBlacklisted(
case class SparkListenerNodeUnblacklisted(time: Long, hostId: String)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerUnschedulableTaskSetAdded(
stageId: Int,
stageAttemptId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerUnschedulableTaskSetRemoved(
stageId: Int,
stageAttemptId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent

Expand Down Expand Up @@ -339,6 +349,20 @@ private[spark] trait SparkListenerInterface {
*/
def onNodeUnblacklisted(nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit

/**
* Called when a taskset becomes unschedulable due to blacklisting and dynamic allocation
* is enabled.
*/
def onUnschedulableTaskSetAdded(
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit

/**
* Called when an unschedulable taskset becomes schedulable and dynamic allocation
* is enabled.
*/
def onUnschedulableTaskSetRemoved(
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit

/**
* Called when the driver receives a block update info.
*/
Expand Down Expand Up @@ -425,6 +449,12 @@ abstract class SparkListener extends SparkListenerInterface {
override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

override def onUnschedulableTaskSetAdded(
unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded): Unit = { }

override def onUnschedulableTaskSetRemoved(
unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved): Unit = { }

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onSpeculativeTaskSubmitted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ private[spark] trait SparkListenerBus
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case unschedulableTaskSetAdded: SparkListenerUnschedulableTaskSetAdded =>
listener.onUnschedulableTaskSetAdded(unschedulableTaskSetAdded)
case unschedulableTaskSetRemoved: SparkListenerUnschedulableTaskSetRemoved =>
listener.onUnschedulableTaskSetRemoved(unschedulableTaskSetRemoved)
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
listener.onResourceProfileAdded(resourceProfileAdded)
case _ => listener.onOtherEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,33 +637,43 @@ private[spark] class TaskSchedulerImpl(
if (!launchedAnyTask) {
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
// If the taskSet is unschedulable we try to find an existing idle blacklisted
// executor. If we cannot find one, we abort immediately. Else we kill the idle
// executor and kick off an abortTimer which if it doesn't schedule a task within the
// the timeout will abort the taskSet if we were unable to schedule any task from the
// taskSet.
// executor and kill the idle executor and kick off an abortTimer which if it doesn't
// schedule a task within the the timeout will abort the taskSet if we were unable to
// schedule any task from the taskSet.
// Note 1: We keep track of schedulability on a per taskSet basis rather than on a per
// task basis.
// Note 2: The taskSet can still be aborted when there are more than one idle
// blacklisted executors and dynamic allocation is on. This can happen when a killed
// idle executor isn't replaced in time by ExecutorAllocationManager as it relies on
// pending tasks and doesn't kill executors on idle timeouts, resulting in the abort
// timer to expire and abort the taskSet.
//
// If there are no idle executors and dynamic allocation is enabled, then we would
// notify ExecutorAllocationManager to allocate more executors to schedule the
// unschedulable tasks else we will abort immediately.
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
blacklistTrackerOpt.foreach(blt => blt.killBlacklistedIdleExecutor(executorId))

val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely "
+ s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
}
case None =>
// Notify ExecutorAllocationManager about the unschedulable task set,
// in order to provision more executors to make them schedulable
if (Utils.isDynamicAllocationEnabled(conf)) {
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
logInfo(s"Notifying ExecutorAllocationManager to allocate more executors to" +
s" schedule the unschedulable task before aborting $taskSet.")
dagScheduler.unschedulableTaskSetAdded(taskSet.taskSet.stageId,
taskSet.taskSet.stageAttemptId)
updateUnschedulableTaskSetTimeoutAndStartAbortTimer(taskSet, taskIndex)
}
} else {
// Abort Immediately
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
s" executors can be found to kill. Aborting $taskSet.")
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
case None => // Abort Immediately
logInfo("Cannot schedule any task because of complete blacklisting. No idle" +
s" executors can be found to kill. Aborting $taskSet." )
taskSet.abortSinceCompletelyBlacklisted(taskIndex)
}
}
} else {
Expand All @@ -676,6 +686,10 @@ private[spark] class TaskSchedulerImpl(
if (unschedulableTaskSetToExpiryTime.nonEmpty) {
logInfo("Clearing the expiry times for all unschedulable taskSets as a task was " +
"recently scheduled.")
// Notify ExecutorAllocationManager as well as other subscribers that a task now
// recently becomes schedulable
dagScheduler.unschedulableTaskSetRemoved(taskSet.taskSet.stageId,
taskSet.taskSet.stageAttemptId)
unschedulableTaskSetToExpiryTime.clear()
}
}
Expand Down Expand Up @@ -722,6 +736,17 @@ private[spark] class TaskSchedulerImpl(
return tasks.map(_.toSeq)
}

private def updateUnschedulableTaskSetTimeoutAndStartAbortTimer(
taskSet: TaskSetManager,
taskIndex: Int): Unit = {
val timeout = conf.get(config.UNSCHEDULABLE_TASKSET_TIMEOUT) * 1000
unschedulableTaskSetToExpiryTime(taskSet) = clock.getTimeMillis() + timeout
logInfo(s"Waiting for $timeout ms for completely " +
s"blacklisted task to be schedulable again before aborting $taskSet.")
abortTimer.schedule(
createUnschedulableTaskSetAbortTimer(taskSet, taskIndex), timeout)
}

private def createUnschedulableTaskSetAbortTimer(
taskSet: TaskSetManager,
taskIndex: Int): TimerTask = {
Expand Down
Loading