Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ private[spark] class ExecutorAllocationManager(
// Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
val executorAllocationManagerSource = new ExecutorAllocationManagerSource

// Whether we are still waiting for the initial set of executors to be allocated.
// While this is true, we will not cancel outstanding executor requests. This is
// set to false when:
// (1) a stage is submitted, or
// (2) an executor idle timeout has elapsed.
@volatile private var initializing: Boolean = true

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -240,6 +247,7 @@ private[spark] class ExecutorAllocationManager(
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
removeExecutor(executorId)
}
!expired
Expand All @@ -261,7 +269,11 @@ private[spark] class ExecutorAllocationManager(
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded

if (maxNeeded < numExecutorsTarget) {
if (initializing) {
// Do not change our target while we are still initializing,
// Otherwise the first job may have to ramp up unnecessarily
0
} else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need, so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
Expand All @@ -271,7 +283,7 @@ private[spark] class ExecutorAllocationManager(
// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget)
logInfo(s"Lowering target number of executors to $numExecutorsTarget (previously " +
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
numExecutorsTarget - oldNumExecutorsTarget
Expand Down Expand Up @@ -481,6 +493,7 @@ private[spark] class ExecutorAllocationManager(
private var numRunningTasks: Int = _

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId
val numTasks = stageSubmitted.stageInfo.numTasks
allocationManager.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ExecutorAllocationManagerSuite
}

test("add executors") {
sc = createSparkContext(1, 10)
sc = createSparkContext(1, 10, 1)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

Expand Down Expand Up @@ -135,7 +135,7 @@ class ExecutorAllocationManagerSuite
}

test("add executors capped by num pending tasks") {
sc = createSparkContext(0, 10)
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5)))

Expand Down Expand Up @@ -186,7 +186,7 @@ class ExecutorAllocationManagerSuite
}

test("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10)
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5)))

Expand All @@ -213,7 +213,7 @@ class ExecutorAllocationManagerSuite
}

test("remove executors") {
sc = createSparkContext(5, 10)
sc = createSparkContext(5, 10, 5)
val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }

Expand Down Expand Up @@ -263,7 +263,7 @@ class ExecutorAllocationManagerSuite
}

test ("interleaving add and remove") {
sc = createSparkContext(5, 10)
sc = createSparkContext(5, 10, 5)
val manager = sc.executorAllocationManager.get
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000)))

Expand Down Expand Up @@ -331,7 +331,7 @@ class ExecutorAllocationManagerSuite
}

test("starting/canceling add timer") {
sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10, 2)
val clock = new ManualClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -363,7 +363,7 @@ class ExecutorAllocationManagerSuite
}

test("starting/canceling remove timers") {
sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10, 2)
val clock = new ManualClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -410,7 +410,7 @@ class ExecutorAllocationManagerSuite
}

test("mock polling loop with no events") {
sc = createSparkContext(0, 20)
sc = createSparkContext(0, 20, 0)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(2020L)
manager.setClock(clock)
Expand All @@ -436,7 +436,7 @@ class ExecutorAllocationManagerSuite
}

test("mock polling loop add behavior") {
sc = createSparkContext(0, 20)
sc = createSparkContext(0, 20, 0)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -486,7 +486,7 @@ class ExecutorAllocationManagerSuite
}

test("mock polling loop remove behavior") {
sc = createSparkContext(1, 20)
sc = createSparkContext(1, 20, 1)
val clock = new ManualClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
Expand Down Expand Up @@ -547,7 +547,7 @@ class ExecutorAllocationManagerSuite
}

test("listeners trigger add executors correctly") {
sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET)

Expand Down Expand Up @@ -577,7 +577,7 @@ class ExecutorAllocationManagerSuite
}

test("listeners trigger remove executors correctly") {
sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty)

Expand Down Expand Up @@ -608,7 +608,7 @@ class ExecutorAllocationManagerSuite
}

test("listeners trigger add and remove executor callbacks correctly") {
sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
Expand Down Expand Up @@ -641,7 +641,7 @@ class ExecutorAllocationManagerSuite
}

test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
sc = createSparkContext(2, 10)
sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
Expand Down Expand Up @@ -677,7 +677,7 @@ class ExecutorAllocationManagerSuite
}

test("avoid ramp up when target < running executors") {
sc = createSparkContext(0, 100000)
sc = createSparkContext(0, 100000, 0)
val manager = sc.executorAllocationManager.get
val stage1 = createStageInfo(0, 1000)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
Expand All @@ -701,13 +701,67 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 16)
}

private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
test("avoid ramp down initial executors until first job is submitted") {
sc = createSparkContext(2, 5, 3)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(10000L)
manager.setClock(clock)

// Verify the initial number of executors
assert(numExecutorsTarget(manager) === 3)
schedule(manager)
// Verify whether the initial number of executors is kept with no pending tasks
assert(numExecutorsTarget(manager) === 3)

sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2)))
clock.advance(100L)

assert(maxNumExecutorsNeeded(manager) === 2)
schedule(manager)

// Verify that current number of executors should be ramp down when first job is submitted
assert(numExecutorsTarget(manager) === 2)
}

test("avoid ramp down initial executors until idle executor is timeout") {
sc = createSparkContext(2, 5, 3)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(10000L)
manager.setClock(clock)

// Verify the initial number of executors
assert(numExecutorsTarget(manager) === 3)
schedule(manager)
// Verify the initial number of executors is kept when no pending tasks
assert(numExecutorsTarget(manager) === 3)
(0 until 3).foreach { i =>
onExecutorAdded(manager, s"executor-$i")
}

clock.advance(executorIdleTimeout * 1000)

assert(maxNumExecutorsNeeded(manager) === 0)
schedule(manager)
// Verify executor is timeout but numExecutorsTarget is not recalculated
assert(numExecutorsTarget(manager) === 3)

// Schedule again to recalculate the numExecutorsTarget after executor is timeout
schedule(manager)
// Verify that current number of executors should be ramp down when executor is timeout
assert(numExecutorsTarget(manager) === 2)
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
initialExecutors: Int = 1): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
.set("spark.dynamicAllocation.schedulerBacklogTimeout",
s"${schedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
Expand Down Expand Up @@ -791,6 +845,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
manager invokePrivate _schedule()
}

private def maxNumExecutorsNeeded(manager: ExecutorAllocationManager): Int = {
manager invokePrivate _maxNumExecutorsNeeded()
}

private def addExecutors(manager: ExecutorAllocationManager): Int = {
val maxNumExecutorsNeeded = manager invokePrivate _maxNumExecutorsNeeded()
manager invokePrivate _addExecutors(maxNumExecutorsNeeded)
Expand Down