From 531e2b657fa0ee702b17fe62dc21f0584a17a970 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 10 Nov 2014 23:32:47 -0800 Subject: [PATCH 1/5] SPARK-4214. With dynamic allocation, avoid outstanding requests for more executors than pending tasks need. --- .../spark/ExecutorAllocationManager.scala | 36 +++++++++++++++---- .../spark/scheduler/TaskScheduler.scala | 2 ++ .../spark/scheduler/TaskSchedulerImpl.scala | 4 +++ .../spark/scheduler/DAGSchedulerSuite.scala | 2 ++ 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index ef93009a074e7..07ffd79a55252 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -110,6 +110,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Clock used to schedule when executors should be added and removed private var clock: Clock = new RealClock + // TODO: The default value of 1 for spark.executor.cores works right now because dynamic + // allocation is only supported for YARN and the default number of cores per executor in YARN is + // 1, but it might need to be attained differently for different cluster managers + private val tasksPerExecutor = + conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -217,14 +223,24 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging numExecutorsToAdd = 1 return 0 } + val maxExecutorsPending = maxPendingExecutors(sc.taskScheduler.numPendingTasks) + if (numExecutorsPending >= maxExecutorsPending) { + logDebug(s"Not adding executors because there are already $numExecutorsPending " + + s"pending and pending tasks could only fill $maxExecutorsPending") + numExecutorsToAdd = 1 + return 0 + } + + // It's never useful to request more executors than could satisfy all the pending tasks, so + // cap request at that amount. + // Also cap request with respect to the configured upper bound. + val maxExecutorsToAdd = math.min( + maxExecutorsPending - numExecutorsPending, + maxNumExecutors - numExistingExecutors) + assert(maxExecutorsToAdd > 0) + + val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxExecutorsToAdd) - // Request executors with respect to the upper bound - val actualNumExecutorsToAdd = - if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) { - numExecutorsToAdd - } else { - maxNumExecutors - numExistingExecutors - } val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd) if (addRequestAcknowledged) { @@ -360,6 +376,10 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging removeTimes.remove(executorId) } + private def maxPendingExecutors(numPendingTasks: Int): Int = { + (numPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor + } + /** * A listener that notifies the given allocation manager of when to add and remove executors. * @@ -445,6 +465,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId) } + + def totalPendingTasks(): Int = stageIdToNumTasks.values.sum } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f095915352b17..f1c3641ab5d09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -74,4 +74,6 @@ private[spark] trait TaskScheduler { */ def applicationId(): String = appId + def numPendingTasks(): Int + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index cd3c015321e85..6330bec66a374 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -491,6 +491,10 @@ private[spark] class TaskSchedulerImpl( override def applicationId(): String = backend.applicationId() + def numPendingTasks(): Int = synchronized { + activeTaskSets.values.map(_.allPendingTasks.size).sum + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 819f95634bcdc..0ad1eb3e3609a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -93,6 +93,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 + override def numPendingTasks() = -1 } /** Length of time to wait while draining listener events. */ @@ -373,6 +374,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true + override def numPendingTasks() = -1 } val noKillScheduler = new DAGScheduler( sc, From 6ae080c6246ee3dfba1762ddcc7ccc94e5dd2042 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 13 Nov 2014 17:42:44 -0800 Subject: [PATCH 2/5] Add tests and get num pending tasks from ExecutorAllocationListener --- .../spark/ExecutorAllocationManager.scala | 34 +++++++++---- .../spark/scheduler/TaskScheduler.scala | 2 - .../spark/scheduler/TaskSchedulerImpl.scala | 4 -- .../ExecutorAllocationManagerSuite.scala | 49 +++++++++++++++++++ .../spark/scheduler/DAGSchedulerSuite.scala | 2 - 5 files changed, 73 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 07ffd79a55252..fe1c21a57926f 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,7 +28,8 @@ import org.apache.spark.scheduler._ * the scheduler queue is not drained in N seconds, then new executors are added. If the queue * persists for another M seconds, then more executors are added and so on. The number added * in each round increases exponentially from the previous round until an upper bound on the - * number of executors has been reached. + * number of executors has been reached. The upper bound is based both on a configured property + * and on the number of tasks pending. * * The rationale for the exponential increase is twofold: (1) Executors should be added slowly * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, @@ -82,6 +83,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // During testing, the methods to actually kill and add executors are mocked out private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) + // TODO: The default value of 1 for spark.executor.cores works right now because dynamic + // allocation is only supported for YARN and the default number of cores per executor in YARN is + // 1, but it might need to be attained differently for different cluster managers + private val tasksPerExecutor = + conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + validateSettings() // Number of executors to add in the next round @@ -110,11 +117,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Clock used to schedule when executors should be added and removed private var clock: Clock = new RealClock - // TODO: The default value of 1 for spark.executor.cores works right now because dynamic - // allocation is only supported for YARN and the default number of cores per executor in YARN is - // 1, but it might need to be attained differently for different cluster managers - private val tasksPerExecutor = - conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + private var listener: ExecutorAllocationListener = _ /** * Verify that the settings specified through the config are valid. @@ -147,6 +150,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } + if (tasksPerExecutor == 0) { + throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores") + } } /** @@ -160,7 +166,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * Register for scheduler callbacks to decide when to add and remove executors. */ def start(): Unit = { - val listener = new ExecutorAllocationListener(this) + listener = new ExecutorAllocationListener(this) sc.addSparkListener(listener) startPolling() } @@ -223,7 +229,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging numExecutorsToAdd = 1 return 0 } - val maxExecutorsPending = maxPendingExecutors(sc.taskScheduler.numPendingTasks) + val maxExecutorsPending = maxExecutorsNeededForTasks(listener.totalPendingTasks()) if (numExecutorsPending >= maxExecutorsPending) { logDebug(s"Not adding executors because there are already $numExecutorsPending " + s"pending and pending tasks could only fill $maxExecutorsPending") @@ -376,7 +382,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging removeTimes.remove(executorId) } - private def maxPendingExecutors(numPendingTasks: Int): Int = { + private def maxExecutorsNeededForTasks(numPendingTasks: Int): Int = { (numPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor } @@ -466,7 +472,15 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId) } - def totalPendingTasks(): Int = stageIdToNumTasks.values.sum + /** + * An estimate of the total number of pending tasks remaining for currently running stages. Does + * not account for tasks which may have failed and been resubmitted. + */ + def totalPendingTasks(): Int = { + stageIdToNumTasks.map{ case (stageId, numTasks) => + numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) + }.sum + } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index f1c3641ab5d09..f095915352b17 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -74,6 +74,4 @@ private[spark] trait TaskScheduler { */ def applicationId(): String = appId - def numPendingTasks(): Int - } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 6330bec66a374..cd3c015321e85 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -491,10 +491,6 @@ private[spark] class TaskSchedulerImpl( override def applicationId(): String = backend.applicationId() - def numPendingTasks(): Int = synchronized { - activeTaskSets.values.map(_.allPendingTasks.size).sum - } - } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 66cf60d25f6d1..07e0aa441d473 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -76,6 +76,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { test("add executors") { sc = createSparkContext(1, 10) val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached assert(numExecutorsPending(manager) === 0) @@ -117,6 +118,51 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(numExecutorsToAdd(manager) === 1) } + test("add executors capped by num pending tasks") { + sc = createSparkContext(1, 10) + val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + + // Verify that we're capped at number of tasks in the stage + assert(numExecutorsPending(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 1) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 3) + assert(numExecutorsToAdd(manager) === 4) + assert(addExecutors(manager) === 2) + assert(numExecutorsPending(manager) === 5) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that running a task reduces the cap + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) + sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 6) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 7) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that re-running a task doesn't reduce the cap further + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 8) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 1) + assert(numExecutorsPending(manager) === 9) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that running a task once we're at our limit doesn't blow things up + sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) + assert(addExecutors(manager) === 0) + assert(numExecutorsPending(manager) === 9) + } + test("remove executors") { sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get @@ -170,6 +216,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { test ("interleaving add and remove") { sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Add a few executors assert(addExecutors(manager) === 1) @@ -343,6 +390,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { val clock = new TestClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -659,4 +707,5 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = { manager invokePrivate _onExecutorBusy(id) } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 0ad1eb3e3609a..819f95634bcdc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -93,7 +93,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def numPendingTasks() = -1 } /** Length of time to wait while draining listener events. */ @@ -374,7 +373,6 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def defaultParallelism() = 2 override def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean = true - override def numPendingTasks() = -1 } val noKillScheduler = new DAGScheduler( sc, From 067465f4ac2e05e563795f62657b0ac093b09073 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 13 Nov 2014 19:19:23 -0800 Subject: [PATCH 3/5] Whitespace fix --- .../scala/org/apache/spark/ExecutorAllocationManagerSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 07e0aa441d473..4b27477790212 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -707,5 +707,4 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = { manager invokePrivate _onExecutorBusy(id) } - } From 13b53dfb417e90354833a00d5a98f872ffbd2694 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 14 Nov 2014 14:12:33 -0800 Subject: [PATCH 4/5] Review feedback --- .../spark/ExecutorAllocationManager.scala | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index fe1c21a57926f..1284422e5d681 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -29,7 +29,8 @@ import org.apache.spark.scheduler._ * persists for another M seconds, then more executors are added and so on. The number added * in each round increases exponentially from the previous round until an upper bound on the * number of executors has been reached. The upper bound is based both on a configured property - * and on the number of tasks pending. + * and on the number of tasks pending: the policy will never increase the number of executor + * requests past the number needed to handle all pending tasks. * * The rationale for the exponential increase is twofold: (1) Executors should be added slowly * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, @@ -117,7 +118,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // Clock used to schedule when executors should be added and removed private var clock: Clock = new RealClock - private var listener: ExecutorAllocationListener = _ + // Listener for Spark events that impact the allocation policy + private val listener = new ExecutorAllocationListener(this) /** * Verify that the settings specified through the config are valid. @@ -166,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * Register for scheduler callbacks to decide when to add and remove executors. */ def start(): Unit = { - listener = new ExecutorAllocationListener(this) sc.addSparkListener(listener) startPolling() } @@ -229,10 +230,11 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging numExecutorsToAdd = 1 return 0 } - val maxExecutorsPending = maxExecutorsNeededForTasks(listener.totalPendingTasks()) - if (numExecutorsPending >= maxExecutorsPending) { + val maxNumExecutorsPending = + (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor + if (numExecutorsPending >= maxNumExecutorsPending) { logDebug(s"Not adding executors because there are already $numExecutorsPending " + - s"pending and pending tasks could only fill $maxExecutorsPending") + s"pending and pending tasks could only fill $maxNumExecutorsPending") numExecutorsToAdd = 1 return 0 } @@ -240,12 +242,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging // It's never useful to request more executors than could satisfy all the pending tasks, so // cap request at that amount. // Also cap request with respect to the configured upper bound. - val maxExecutorsToAdd = math.min( - maxExecutorsPending - numExecutorsPending, + val maxNumExecutorsToAdd = math.min( + maxNumExecutorsPending - numExecutorsPending, maxNumExecutors - numExistingExecutors) - assert(maxExecutorsToAdd > 0) + assert(maxNumExecutorsToAdd > 0) - val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxExecutorsToAdd) + val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd) val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd) @@ -382,10 +384,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging removeTimes.remove(executorId) } - private def maxExecutorsNeededForTasks(numPendingTasks: Int): Int = { - (numPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor - } - /** * A listener that notifies the given allocation manager of when to add and remove executors. * @@ -477,7 +475,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging * not account for tasks which may have failed and been resubmitted. */ def totalPendingTasks(): Int = { - stageIdToNumTasks.map{ case (stageId, numTasks) => + stageIdToNumTasks.map { case (stageId, numTasks) => numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) }.sum } From 35cf0e0ec3293a4f5e7caf68589f13a2046a023b Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 14 Nov 2014 14:20:10 -0800 Subject: [PATCH 5/5] Add comment --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1284422e5d681..7153ac3ba8732 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -230,6 +230,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging numExecutorsToAdd = 1 return 0 } + // The number of executors needed to satisfy all pending tasks is the number of tasks pending + // divided by the number of tasks each executor can fit, rounded up. val maxNumExecutorsPending = (listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor if (numExecutorsPending >= maxNumExecutorsPending) {