Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
* persists for another M seconds, then more executors are added and so on. The number added
* in each round increases exponentially from the previous round until an upper bound on the
* number of executors has been reached.
* number of executors has been reached. The upper bound is based both on a configured property
* and on the number of tasks pending: the policy will never increase the number of executor
* requests past the number needed to handle all pending tasks.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
Expand Down Expand Up @@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could become 0 if spark.task.cpus > spark.executor.cores, and you're dividing by this later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a misconfiguration because executors wouldn't be able to fit a single task. Will add an exception to make it fail earlier.


validateSettings()

// Number of executors to add in the next round
Expand Down Expand Up @@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock

// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener(this)

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
}
}

/**
Expand All @@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
* Register for scheduler callbacks to decide when to add and remove executors.
*/
def start(): Unit = {
val listener = new ExecutorAllocationListener(this)
sc.addSparkListener(listener)
startPolling()
}
Expand Down Expand Up @@ -217,14 +230,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
numExecutorsToAdd = 1
return 0
}
// The number of executors needed to satisfy all pending tasks is the number of tasks pending
// divided by the number of tasks each executor can fit, rounded up.
val maxNumExecutorsPending =
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
if (numExecutorsPending >= maxNumExecutorsPending) {
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
s"pending and pending tasks could only fill $maxNumExecutorsPending")
numExecutorsToAdd = 1
return 0
}

// It's never useful to request more executors than could satisfy all the pending tasks, so
// cap request at that amount.
// Also cap request with respect to the configured upper bound.
val maxNumExecutorsToAdd = math.min(
maxNumExecutorsPending - numExecutorsPending,
maxNumExecutors - numExistingExecutors)
assert(maxNumExecutorsToAdd > 0)

val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)

// Request executors with respect to the upper bound
val actualNumExecutorsToAdd =
if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
numExecutorsToAdd
} else {
maxNumExecutors - numExistingExecutors
}
val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
if (addRequestAcknowledged) {
Expand Down Expand Up @@ -445,6 +471,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
}

/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*/
def totalPendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test("add executors") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

// Keep adding until the limit is reached
assert(numExecutorsPending(manager) === 0)
Expand Down Expand Up @@ -117,6 +118,51 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(numExecutorsToAdd(manager) === 1)
}

test("add executors capped by num pending tasks") {
sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))

// Verify that we're capped at number of tasks in the stage
assert(numExecutorsPending(manager) === 0)
assert(numExecutorsToAdd(manager) === 1)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 1)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 2)
assert(numExecutorsPending(manager) === 3)
assert(numExecutorsToAdd(manager) === 4)
assert(addExecutors(manager) === 2)
assert(numExecutorsPending(manager) === 5)
assert(numExecutorsToAdd(manager) === 1)

// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 6)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 7)
assert(numExecutorsToAdd(manager) === 1)

// Verify that re-running a task doesn't reduce the cap further
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3)))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 8)
assert(numExecutorsToAdd(manager) === 2)
assert(addExecutors(manager) === 1)
assert(numExecutorsPending(manager) === 9)
assert(numExecutorsToAdd(manager) === 1)

// Verify that running a task once we're at our limit doesn't blow things up
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 9)
}

test("remove executors") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
Expand Down Expand Up @@ -170,6 +216,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
test ("interleaving add and remove") {
sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

// Add a few executors
assert(addExecutors(manager) === 1)
Expand Down Expand Up @@ -343,6 +390,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

// Scheduler queue backlogged
onSchedulerBacklogged(manager)
Expand Down