diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 61ab63584269..04463b1a269d 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -501,11 +501,16 @@ private[spark] class ExecutorAllocationManager( */ private def addExecutors(maxNumExecutorsNeeded: Int, rpId: Int): Int = { val oldNumExecutorsTarget = numExecutorsTargetPerResourceProfileId(rpId) + // Increase the maxNumExecutors by adding the excluded executors so that manager can + // launch new executors to replace the excluded executors. + val exclude = executorMonitor.excludedExecutorCount + val maxOverheadExecutors = maxNumExecutors + exclude // Do not request more executors if it would put our target over the upper bound // this is doing a max check per ResourceProfile - if (oldNumExecutorsTarget >= maxNumExecutors) { + if (oldNumExecutorsTarget >= maxOverheadExecutors ) { logDebug("Not adding executors because our current target total " + - s"is already ${oldNumExecutorsTarget} (limit $maxNumExecutors)") + s"is already ${oldNumExecutorsTarget} (limit $maxOverheadExecutors " + + s"${if (exclude > 0) s"($exclude exclude)" else ""})") numExecutorsToAddPerResourceProfileId(rpId) = 1 return 0 } @@ -518,7 +523,8 @@ private[spark] class ExecutorAllocationManager( // Ensure that our target doesn't exceed what we need at the present moment: numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded) // Ensure that our target fits within configured bounds: - numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) + numExecutorsTarget = math.max( + math.min(numExecutorsTarget, maxOverheadExecutors), minNumExecutors) val delta = numExecutorsTarget - oldNumExecutorsTarget numExecutorsTargetPerResourceProfileId(rpId) = numExecutorsTarget diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 8dbdc84905e3..4dd05cf6d07e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -52,6 +52,11 @@ private[spark] class ExecutorMonitor( private val shuffleTrackingEnabled = !conf.get(SHUFFLE_SERVICE_ENABLED) && conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) + // We only track the `excluded` status of executor tacker when the exclusion feature is enabled + // but killing the excluded executors is disabled. + private val exclusionButNotKillEnabled = conf.get(EXCLUDE_ON_FAILURE_ENABLED).getOrElse(false) && + !conf.get(EXCLUDE_ON_FAILURE_KILL_ENABLED) + private val executors = new ConcurrentHashMap[String, Tracker]() private val execResourceProfileCount = new ConcurrentHashMap[Int, Int]() @@ -166,6 +171,7 @@ private[spark] class ExecutorMonitor( def executorCount: Int = executors.size() + // executors that are available for running tasks. Excluded executors are already excluded. def executorCountWithResourceProfile(id: Int): Int = { execResourceProfileCount.getOrDefault(id, 0) } @@ -180,6 +186,14 @@ private[spark] class ExecutorMonitor( } } + def excludedExecutorCount: Int = { + if (exclusionButNotKillEnabled) { + executors.values().asScala.count(_.excluded) + } else { + 0 + } + } + def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval } def pendingRemovalCountPerResourceProfileId(id: Int): Int = { @@ -307,7 +321,8 @@ private[spark] class ExecutorMonitor( val executorId = event.taskInfo.executorId // Guard against a late arriving task start event (SPARK-26927). if (client.isExecutorActive(executorId)) { - val exec = ensureExecutorIsTracked(executorId, UNKNOWN_RESOURCE_PROFILE_ID) + val exec = ensureExecutorIsTracked( + executorId, event.taskInfo.host, UNKNOWN_RESOURCE_PROFILE_ID) exec.updateRunningTasks(1) } } @@ -337,7 +352,8 @@ private[spark] class ExecutorMonitor( } override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { - val exec = ensureExecutorIsTracked(event.executorId, event.executorInfo.resourceProfileId) + val exec = ensureExecutorIsTracked( + event.executorId, event.executorInfo.executorHost, event.executorInfo.resourceProfileId) exec.updateRunningTasks(0) logInfo(s"New executor ${event.executorId} has registered (new total is ${executors.size()})") } @@ -348,6 +364,11 @@ private[spark] class ExecutorMonitor( execResourceProfileCount.remove(rpId, 0) } + private def incrementExecResourceProfileCount(rpId: Int): Unit = { + val count = execResourceProfileCount.computeIfAbsent(rpId, _ => 0) + execResourceProfileCount.replace(rpId, count, count + 1) + } + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { val removed = executors.remove(event.executorId) if (removed != null) { @@ -359,7 +380,9 @@ private[spark] class ExecutorMonitor( } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { - val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, + val exec = ensureExecutorIsTracked( + event.blockUpdatedInfo.blockManagerId.executorId, + event.blockUpdatedInfo.blockManagerId.host, UNKNOWN_RESOURCE_PROFILE_ID) // Check if it is a shuffle file, or RDD to pick the correct codepath for update @@ -418,6 +441,42 @@ private[spark] class ExecutorMonitor( } } + override def onExecutorExcluded(executorExcluded: SparkListenerExecutorExcluded): Unit = { + if (exclusionButNotKillEnabled) { + Option(executors.get(executorExcluded.executorId)).foreach { exec => + exec.excluded = true + decrementExecResourceProfileCount(exec.resourceProfileId) + } + } + } + + override def onExecutorUnexcluded(executorUnexcluded: SparkListenerExecutorUnexcluded): Unit = { + if (exclusionButNotKillEnabled) { + Option(executors.get(executorUnexcluded.executorId)).foreach { exec => + exec.excluded = false + incrementExecResourceProfileCount(exec.resourceProfileId) + } + } + } + + override def onNodeExcluded(nodeExcluded: SparkListenerNodeExcluded): Unit = { + if (exclusionButNotKillEnabled) { + executors.values().asScala.filter(_.host == nodeExcluded.hostId).foreach { exec => + exec.excluded = true + decrementExecResourceProfileCount(exec.resourceProfileId) + } + } + } + + override def onNodeUnexcluded(nodeUnexcluded: SparkListenerNodeUnexcluded): Unit = { + if (exclusionButNotKillEnabled) { + executors.values().asScala.filter(_.host == nodeUnexcluded.hostId).foreach { exec => + exec.excluded = false + incrementExecResourceProfileCount(exec.resourceProfileId) + } + } + } + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { case ShuffleCleanedEvent(id) => cleanupShuffle(id) case _ => @@ -467,15 +526,13 @@ private[spark] class ExecutorMonitor( * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` * event, which is possible because these events are posted in different threads. (see SPARK-4951) */ - private def ensureExecutorIsTracked(id: String, resourceProfileId: Int): Tracker = { - val numExecsWithRpId = execResourceProfileCount.computeIfAbsent(resourceProfileId, _ => 0) + private def ensureExecutorIsTracked(id: String, host: String, resourceProfileId: Int): Tracker = { val execTracker = executors.computeIfAbsent(id, _ => { - val newcount = numExecsWithRpId + 1 - execResourceProfileCount.put(resourceProfileId, newcount) - logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " + - s"count is now $newcount") - new Tracker(resourceProfileId) - }) + incrementExecResourceProfileCount(resourceProfileId) + logDebug(s"Executor added with ResourceProfile id: $resourceProfileId " + + s"count is now ${execResourceProfileCount.get(resourceProfileId)}") + new Tracker(resourceProfileId, host) + }) // if we had added executor before without knowing the resource profile id, fix it up if (execTracker.resourceProfileId == UNKNOWN_RESOURCE_PROFILE_ID && resourceProfileId != UNKNOWN_RESOURCE_PROFILE_ID) { @@ -483,7 +540,7 @@ private[spark] class ExecutorMonitor( s"it to $resourceProfileId") execTracker.resourceProfileId = resourceProfileId // fix up the counts for each resource profile id - execResourceProfileCount.put(resourceProfileId, numExecsWithRpId + 1) + incrementExecResourceProfileCount(resourceProfileId) decrementExecResourceProfileCount(UNKNOWN_RESOURCE_PROFILE_ID) } execTracker @@ -506,7 +563,7 @@ private[spark] class ExecutorMonitor( } } - private class Tracker(var resourceProfileId: Int) { + private class Tracker(var resourceProfileId: Int, val host: String) { @volatile var timeoutAt: Long = Long.MaxValue // Tracks whether this executor is thought to be timed out. It's used to detect when the list @@ -516,6 +573,8 @@ private[spark] class ExecutorMonitor( var pendingRemoval: Boolean = false var decommissioning: Boolean = false var hasActiveShuffle: Boolean = false + // whether the executor is temporarily excluded by the `HealthTracker` + var excluded: Boolean = false private var idleStart: Long = -1 private var runningTasks: Int = 0 diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index c1269a9c9104..9dcdcf259237 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1648,11 +1648,45 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(numExecutorsTargetForDefaultProfileId(manager) === 1) } + test("SPARK-33799: excluded executors should not be considered as available executors") { + val manager = createManager(createConf(1, 4, 1, + exclusionEnabled = true)) + post(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + onExecutorAdded(manager, "executor-0", defaultProfile) + + val updatesNeeded = + new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates] + + // Keep adding until the limit is reached + assert(numExecutorsTargetForDefaultProfileId(manager) === 1) + assert(numExecutorsToAddForDefaultProfile(manager) === 1) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + onExecutorAdded(manager, "executor-1", defaultProfile) + + assert(numExecutorsTargetForDefaultProfileId(manager) === 2) + assert(numExecutorsToAddForDefaultProfile(manager) === 2) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2) + doUpdateRequest(manager, updatesNeeded.toMap, clock.getTimeMillis()) + onExecutorAdded(manager, "executor-2", defaultProfile) + onExecutorAdded(manager, "executor-3", defaultProfile) + + assert(numExecutorsTargetForDefaultProfileId(manager) === 4) + assert(numExecutorsToAddForDefaultProfile(manager) === 4) + // reach the maxExecutors, so no more executor will be added + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 0) + + // mark executor-0 as excluded, so manager would launch a new executor to replace it + post(SparkListenerExecutorExcluded(clock.getTimeMillis(), "executor-0", 1)) + assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1) + } + private def createConf( minExecutors: Int = 1, maxExecutors: Int = 5, initialExecutors: Int = 1, - decommissioningEnabled: Boolean = false): SparkConf = { + decommissioningEnabled: Boolean = false, + exclusionEnabled: Boolean = false): SparkConf = { val sparkConf = new SparkConf() .set(config.DYN_ALLOCATION_ENABLED, true) .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) @@ -1670,6 +1704,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // and thread "pool-1-thread-1-ScalaTest-running". .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false) .set(DECOMMISSION_ENABLED, decommissioningEnabled) + .set(config.EXCLUDE_ON_FAILURE_ENABLED, exclusionEnabled) sparkConf } diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index 6d494796d5a2..7901394fc4f3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -433,6 +433,58 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline).isEmpty) } + test("SPARK-33799: ExecutorMonitor should handle " + + "SparkListenerExecutorExcluded/SparkListenerExecutorUnexcluded") { + conf.set(EXCLUDE_ON_FAILURE_ENABLED, true) + .set(EXCLUDE_ON_FAILURE_KILL_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 1) + + // the excluded executor should not be counted + monitor.onExecutorExcluded(SparkListenerExecutorExcluded(clock.getTimeMillis(), "1", 1)) + assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 0) + + // the unexcluded executor can be counted again + monitor.onExecutorUnexcluded(SparkListenerExecutorUnexcluded(clock.getTimeMillis(), "1")) + assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 1) + } + + test("SPARK-33799: ExecutorMonitor should handle " + + "SparkListenerNodeExcluded/SparkListenerNodeUnexcluded") { + conf.set(EXCLUDE_ON_FAILURE_ENABLED, true) + .set(EXCLUDE_ON_FAILURE_KILL_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", execInfo)) + assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 2) + + // executors on the excluded node should not be counted + monitor.onNodeExcluded( + SparkListenerNodeExcluded(clock.getTimeMillis(), execInfo.executorHost, 1)) + assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 0) + + // executors on the unexcluded node can be counted again + monitor.onNodeUnexcluded( + SparkListenerNodeUnexcluded(clock.getTimeMillis(), execInfo.executorHost)) + assert(monitor.executorCountWithResourceProfile(execInfo.resourceProfileId) === 2) + assert(monitor.executorCount === 2) + } + + test("SPARK-33799: Excluded executor can still be timedout") { + conf.set(EXCLUDE_ON_FAILURE_ENABLED, true) + .set(EXCLUDE_ON_FAILURE_KILL_ENABLED, false) + monitor = new ExecutorMonitor(conf, client, null, clock) + monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", execInfo)) + assert(monitor.executorCount === 1) + // mark the executor as excluded + monitor.onExecutorExcluded(SparkListenerExecutorExcluded(clock.getTimeMillis(), "1", 1)) + assert(monitor.isExecutorIdle("1")) + assert(monitor.timedOutExecutors(idleDeadline) === Seq("1")) + assert(monitor.executorCountWithResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) === 0) + assert(monitor.getResourceProfileId("1") === DEFAULT_RESOURCE_PROFILE_ID) + } + private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1 private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1 private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1