Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ private[spark] class ExecutorAllocationManager(

// How long there must be backlogged tasks for before an addition is triggered (seconds)
private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.schedulerBacklogTimeout", "5s")
"spark.dynamicAllocation.schedulerBacklogTimeout", "1s")

// Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s")

// How long an executor must be idle for before it is removed (seconds)
private val executorIdleTimeoutS = conf.getTimeAsSeconds(
"spark.dynamicAllocation.executorIdleTimeout", "600s")
"spark.dynamicAllocation.executorIdleTimeout", "60s")

// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
Expand Down Expand Up @@ -268,6 +268,8 @@ private[spark] class ExecutorAllocationManager(
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
client.requestTotalExecutors(numExecutorsTarget)
numExecutorsToAdd = 1
logInfo(s"Lowering target number of executors to $numExecutorsTarget because " +
s"not all requests are actually needed (previously $oldNumExecutorsTarget)")
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
Expand All @@ -292,9 +294,8 @@ private[spark] class ExecutorAllocationManager(
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
// Do not request more executors if it would put our target over the upper bound
if (numExecutorsTarget >= maxNumExecutors) {
val numExecutorsPending = numExecutorsTarget - executorIds.size
logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
logDebug(s"Not adding executors because our current target total " +
s"is already $numExecutorsTarget (limit $maxNumExecutors)")
numExecutorsToAdd = 1
return 0
}
Expand All @@ -310,10 +311,19 @@ private[spark] class ExecutorAllocationManager(
// Ensure that our target fits within configured bounds:
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)

val delta = numExecutorsTarget - oldNumExecutorsTarget

// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
numExecutorsToAdd = 1
return 0
}

val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
if (addRequestAcknowledged) {
val delta = numExecutorsTarget - oldNumExecutorsTarget
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
val executorsString = "executor" + { if (delta > 1) "s" else "" }
logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
s" (new desired total will be $numExecutorsTarget)")
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
numExecutorsToAdd * 2
Expand Down Expand Up @@ -420,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
* This resets all variables used for adding executors.
*/
private def onSchedulerQueueEmpty(): Unit = synchronized {
logDebug(s"Clearing timer to add executors because there are no more pending tasks")
logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET
numExecutorsToAdd = 1
}
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
<td>600s</td>
<td>60s</td>
<td>
If dynamic allocation is enabled and an executor has been idle for more than this duration,
the executor will be removed. For more detail, see this
Expand Down Expand Up @@ -1224,7 +1224,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
<td>5s</td>
<td>1s</td>
<td>
If dynamic allocation is enabled and there have been pending tasks backlogged for more than
this duration, new executors will be requested. For more detail, see this
Expand Down