From 525b335eabe1dda9c78f3f575230d4706a47cd65 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 11:44:28 -0700 Subject: [PATCH 01/10] Shutdown executor once we are done decommissioning Because the mock always says there is an RDD we may replicate more than once, and now that there are independent threads --- .../apache/spark/deploy/DeployMessage.scala | 2 - .../apache/spark/deploy/worker/Worker.scala | 2 +- .../CoarseGrainedExecutorBackend.scala | 53 +++++++++++- .../cluster/CoarseGrainedClusterMessage.scala | 3 + .../CoarseGrainedSchedulerBackend.scala | 9 ++ .../apache/spark/storage/BlockManager.scala | 8 ++ .../storage/BlockManagerDecommissioner.scala | 85 ++++++++++++++++--- ...kManagerDecommissionIntegrationSuite.scala | 7 +- .../BlockManagerDecommissionUnitSuite.scala | 12 ++- 9 files changed, 153 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c8c6e5a192a2..b7a64d75a8d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -165,8 +165,6 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. - // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa8c46fc6831..862e685c2dce 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionSelf => + case WorkerDecommission(_, _) => decommissionSelf() } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index def125bb6bfb..0072832bd1d4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null - @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -80,6 +79,9 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] + // Track our decommissioning status internally. + @volatile private var decommissioned = false + override def onStart(): Unit = { logInfo("Registering PWR handler.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + @@ -214,6 +216,10 @@ private[spark] class CoarseGrainedExecutorBackend( case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + + case DecommissionSelf => + logInfo("Received decommission self") + decommissionSelf() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor != null) { executor.decommission() } - logInfo("Done decommissioning self.") + // Shutdown the executor once all tasks are gone & any configured migrations completed. + // Detecting migrations completion doesn't need to be perfect and we want to minimize the + // overhead for executors that are not in decommissioning state as overall that will be + // more of the executors. For example, this will not catch a block which is already in + // the process of being put from a remote executor before migration starts. This trade-off + // is viewed as acceptable to minimize introduction of any new locking structures in critical + // code paths. + + val shutdownThread = new Thread() { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val allBlocksMigrated = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated._2 && + (allBlocksMigrated._1 > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } + } else { + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } + Thread.sleep(sleep_time) + } else { + logInfo("Blocked from shutdown by running task") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + Thread.sleep(sleep_time) + lastTaskRunningTime = System.nanoTime() + } + } + } + logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true } catch { case e: Exception => - logError(s"Error ${e} during attempt to decommission self") + logError("Unexpected error while decommissioning self", e) false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 91485f01bf00..49b859817fb3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage + + // Used to ask an executor to decommission it's self. + case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8fbefae58af1..e05dd823c88a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -442,6 +442,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + } logInfo(s"Finished decommissioning executor $executorId.") if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47af854b6e8f..56e64ef8a765 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,6 +1822,14 @@ private[spark] class BlockManager( } } + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[spark] def lastMigrationInfo(): (Long, Boolean) = { + decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) + } + private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = master.getReplicateInfoForRDDBlocks(blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 1cc7ef6a25f9..00cb22e56bf5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -41,6 +42,13 @@ private[storage] class BlockManagerDecommissioner( private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + // Used for tracking if our migrations are complete. + @volatile private var lastRDDMigrationTime: Long = 0 + @volatile private var lastShuffleMigrationTime: Long = 0 + @volatile private var rddBlocksLeft: Boolean = true + @volatile private var shuffleBlocksLeft: Boolean = true + + /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a * producer/consumer where the main migration loop updates the queue of blocks to be migrated @@ -91,10 +99,12 @@ private[storage] class BlockManagerDecommissioner( null)// class tag, we don't need for shuffle logDebug(s"Migrated sub block ${blockId}") } - logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") + logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}") } else { logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") } + logInfo(s"Migrated ${shuffleBlockInfo}") + numMigratedShuffles.incrementAndGet() } } // This catch is intentionally outside of the while running block. @@ -115,12 +125,21 @@ private[storage] class BlockManagerDecommissioner( // Shuffles which are either in queue for migrations or migrated private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() + // Shuffles which have migrated. This used to know when we are "done", being done can change + // if a new shuffle file is created by a running task. + private val numMigratedShuffles = new AtomicInteger(0) + + + // Shuffles which are queued for migration & number of retries so far. + // Visible in storage for testing. private[storage] val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]() // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false + @volatile private var stoppedRDD = false + @volatile private var stoppedShuffle = false private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() @@ -133,22 +152,24 @@ private[storage] class BlockManagerDecommissioner( override def run(): Unit = { assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedRDD && !Thread.interrupted()) { logInfo("Iterating on migrating from the block manager.") try { + val startTime = System.nanoTime() logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() + rddBlocksLeft = decommissionRddCacheBlocks() + lastRDDMigrationTime = startTime logInfo("Attempt to replicate all cached blocks done") logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => - logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + logInfo("Interrupted during RDD migration, stopping") + stoppedRDD = true case NonFatal(e) => - logError("Error occurred while trying to replicate for block manager decommissioning.", + logError("Error occurred replicating RDD for block manager decommissioning.", e) - stopped = true + stoppedRDD = true } } } @@ -162,20 +183,22 @@ private[storage] class BlockManagerDecommissioner( override def run() { assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedShuffle && !Thread.interrupted()) { try { logDebug("Attempting to replicate all shuffle blocks") - refreshOffloadingShuffleBlocks() + val startTime = System.nanoTime() + shuffleBlocksLeft = refreshOffloadingShuffleBlocks() + lastShuffleMigrationTime = startTime logInfo("Done starting workers to migrate shuffle blocks") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + stoppedShuffle = true case NonFatal(e) => logError("Error occurred while trying to replicate for block manager decommissioning.", e) - stopped = true + stoppedShuffle = true } } } @@ -191,8 +214,9 @@ private[storage] class BlockManagerDecommissioner( * but rather shadows them. * Requires an Indexed based shuffle resolver. * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. + * Returns true if we are not done migrating shuffle blocks. */ - private[storage] def refreshOffloadingShuffleBlocks(): Unit = { + private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { // Update the queue of shuffles to be migrated logInfo("Offloading shuffle blocks") val localShuffles = bm.migratableResolver.getStoredShuffles().toSet @@ -215,6 +239,8 @@ private[storage] class BlockManagerDecommissioner( deadPeers.foreach { peer => migrationPeers.get(peer).foreach(_.running = false) } + // If we found any new shuffles to migrate or otherwise have not migrated everything. + newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() } /** @@ -231,8 +257,9 @@ private[storage] class BlockManagerDecommissioner( /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing + * Returns true if we have not migrated all of our RDD blocks. */ - private[storage] def decommissionRddCacheBlocks(): Unit = { + private[storage] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = bm.getMigratableRDDBlocks() if (replicateBlocksInfo.nonEmpty) { @@ -240,7 +267,7 @@ private[storage] class BlockManagerDecommissioner( "for block manager decommissioning") } else { logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") - return + return false } // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) @@ -252,7 +279,9 @@ private[storage] class BlockManagerDecommissioner( if (blocksFailedReplication.nonEmpty) { logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") + return true } + return false } private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { @@ -275,9 +304,13 @@ private[storage] class BlockManagerDecommissioner( logInfo("Starting block migration thread") if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable) + } else { + stoppedRDD = true } if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable) + } else { + stoppedShuffle = true } if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { @@ -327,4 +360,28 @@ private[storage] class BlockManagerDecommissioner( } logInfo("Stopped storage decommissioner") } + + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[storage] def lastMigrationInfo(): (Long, Boolean) = { + if (stopped || (stoppedRDD && stoppedShuffle)) { + (System.nanoTime(), true) + } else { + + val lastMigrationTime = if ( + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && + conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { + Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) + } else if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { + lastShuffleMigrationTime + } else { + lastRDDMigrationTime + } + + val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) + (lastMigrationTime, blocksMigrated) + } + } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 57410103dd08..00e0a4372e35 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) // Just replicate blocks as fast as we can during testing, there isn't another // workload we need to worry about. - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) sc = new SparkContext(master, "test", conf) @@ -213,10 +213,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } - // Make the executor we decommissioned exit - sched.client.killExecutors(List(execToDecommission)) - - // Wait for the executor to be removed + // Wait for the executor to be removed automatically after migration. executorRemovedSem.acquire(1) // Since the RDD is cached or shuffled so further usage of same RDD should use the diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 5ff1ff05cc4e..627b49d1d5bc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{atLeast => least, mock, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.Eventually._ @@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { private val sparkConf = new SparkConf(false) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true) + // Just replicate blocks as fast as we can during testing, there isn't another + // workload we need to worry about. + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) private def registerShuffleBlocks( mockMigratableShuffleResolver: MigratableResolver, @@ -77,9 +80,10 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() - eventually(timeout(5.second), interval(10.milliseconds)) { + // We don't check that all blocks are migrated because out mock is always returning an RDD. + eventually(timeout(10.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) - verify(bm, times(1)).replicateBlock( + verify(bm, least(1)).replicateBlock( mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), @@ -88,5 +92,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } finally { bmDecomManager.stop() } + + bmDecomManager.stop() } } From 63da30013a58a53078ecfca9878ac4668753343d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 11:48:42 -0700 Subject: [PATCH 02/10] Make Spark's dynamic allocation use decommissioning --- .../spark/ExecutorAllocationClient.scala | 11 ++ .../spark/ExecutorAllocationManager.scala | 18 ++- .../CoarseGrainedSchedulerBackend.scala | 118 +++++++++++------- .../scheduler/dynalloc/ExecutorMonitor.scala | 25 +++- .../ExecutorAllocationManagerSuite.scala | 26 +++- 5 files changed, 140 insertions(+), 58 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 00bd0063c9e3..154d37d19121 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -17,6 +17,7 @@ package org.apache.spark +import org.apache.spark.scheduler.ExecutorDecommissionInfo /** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. @@ -81,6 +82,16 @@ private[spark] trait ExecutorAllocationClient { countFailures: Boolean, force: Boolean = false): Seq[String] + /** + * Request that the cluster manager decommission the specified executors. + * Default implementation delegates to kill, scheduler must override + * if it supports graceful decommissioning. + * + * @param executors identifiers of executors & decom info. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + def decommissionExecutors(executors: Seq[(String, ExecutorDecommissionInfo)]): Seq[String] + /** * Request that the cluster manager kill every executor on the specified host. * diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 85409d599cca..8cab9b16be07 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfileManager @@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { - if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) { + // If dynamic allocation shuffle tracking or worker decommissioning along with + // storage shuffle decommissioning is enabled we have *experimental* support for + // decommissioning without a shuffle service. + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || + (conf.get(WORKER_DECOMMISSION_ENABLED) && + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + @@ -565,8 +571,14 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, - countFailures = false, force = false) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( + id => (id, ExecutorDecommissionInfo("spark scale down", false))) + client.decommissionExecutors(executorIdsWithoutHostLoss) + } else { + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, + countFailures = false, force = false) + } } // [SPARK-21834] killExecutors api reduces the target number of executors. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e05dd823c88a..f22225d3bce7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -420,55 +420,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } /** - * Mark a given executor as decommissioned and stop making resource offers for it. + * Mark given executors as decommissioned and stop making resource offers for it. */ - private def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { - val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId - true - } else { - false - } - } - - if (shouldDisable) { - logInfo(s"Starting decommissioning executor $executorId.") - try { - scheduler.executorDecommission(executorId, decommissionInfo) - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - } - // Send decommission message to the executor (it could have originated on the executor - // but not necessarily. - executorDataMap.get(executorId) match { - case Some(executorInfo) => - executorInfo.executorEndpoint.send(DecommissionSelf) - case None => - // Ignoring the executor since it is not registered. - logWarning(s"Attempted to decommission unknown executor $executorId.") - } - logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } - } else { - logInfo(s"Skipping decommissioning of executor $executorId.") - } - shouldDisable + private def decommissionExecutor(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + (! decommissionExecutors(List((executorId, decomInfo))).isEmpty) } /** @@ -502,6 +458,72 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executors Identifiers of executors & decommission info. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executors: Seq[(String, ExecutorDecommissionInfo)]): Seq[String] = { + + val executorsToDecommission = executors.filter{case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + executorsToDecommission.filter{case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + } + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Starting decommissioning executor $executorId.") + try { + scheduler.executorDecommission(executorId, decomInfo) + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + return false + } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } + logInfo(s"Finished decommissioning executor $executorId.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo("Starting decommissioning block manager corresponding to " + + s"executor $executorId.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + return false + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + true + } + + override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 4d7190726f06..9e8562a9cc4e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ -import org.apache.spark.storage.RDDBlockId +import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId} import org.apache.spark.util.Clock /** @@ -135,6 +135,7 @@ private[spark] class ExecutorMonitor( /** * Mark the given executors as pending to be removed. Should only be called in the EAM thread. + * This covers both kills and decommissions. */ def executorsKilled(ids: Seq[String]): Unit = { ids.foreach { id => @@ -298,6 +299,7 @@ private[spark] class ExecutorMonitor( // // This means that an executor may be marked as having shuffle data, and thus prevented // from being removed, even though the data may not be used. + // TODO: Only track used files (SPARK-31974) if (shuffleTrackingEnabled && event.reason == Success) { stageToShuffleID.get(event.stageId).foreach { shuffleId => exec.addShuffle(shuffleId) @@ -333,11 +335,26 @@ private[spark] class ExecutorMonitor( } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { - if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { - return - } val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, UNKNOWN_RESOURCE_PROFILE_ID) + + // Check if it is a shuffle file, or RDD to pick the correct codepath for update + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can be + * used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks + * around now this wires it up so that it keeps track of it. We only do this for data blocks + * as index and other blocks blocks do not necessarily mean the entire block has been + * committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } + return + } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { + return + } val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 65efa10bfcf9..2e7051ba3c56 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{ManualClock, Utils} @@ -44,6 +45,14 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } test("basic functionality") { + basicTest(decommissioning = false) + } + + test("basic decommissioning") { + basicTest(decommissioning = true) + } + + def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly @@ -83,12 +92,23 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase Map.empty)} } - /** Verify that a particular executor was killed */ + /** Verify that particular executors was killed */ def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { if (expectedKilledExec.nonEmpty) { - verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + if (decommissioning) { + val expectedWithInfo = expectedKilledExec.map{ exec => + (exec, ExecutorDecommissionInfo("spark scale down", false)) + } + verify(allocationClient, times(1)).decommissionExecutors(meq(expectedWithInfo.toSeq)) + } else { + verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + } } else { - verify(allocationClient, never).killExecutor(null) + if (decommissioning) { + verify(allocationClient, never).decommissionExecutors(null) + } else { + verify(allocationClient, never).killExecutor(null) + } } } From 9f7d752066a6e56ca3de6c940f9a8558204b367a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 11:51:09 -0700 Subject: [PATCH 03/10] Track the decommissioning executors in the core dynamic scheduler so we don't scale down too low, update the streaming ExecutorAllocationManager to also delegate to decommission Fix up executor add for resource profile --- .../spark/ExecutorAllocationClient.scala | 29 +++++++- .../spark/ExecutorAllocationManager.scala | 12 +++- .../CoarseGrainedSchedulerBackend.scala | 49 +++++++------ .../scheduler/dynalloc/ExecutorMonitor.scala | 34 ++++++++- .../ExecutorAllocationManagerSuite.scala | 71 ++++++++++++++++++- .../scheduler/ExecutorAllocationManager.scala | 9 ++- .../ExecutorAllocationManagerSuite.scala | 11 ++- 7 files changed, 183 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 154d37d19121..740831c2b49b 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -88,9 +88,36 @@ private[spark] trait ExecutorAllocationClient { * if it supports graceful decommissioning. * * @param executors identifiers of executors & decom info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ - def decommissionExecutors(executors: Seq[(String, ExecutorDecommissionInfo)]): Seq[String] + def decommissionExecutors( + executors: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + killExecutors(executors.map(_._1), + adjustTargetNumExecutors, + countFailures = false) + } + + + /** + * Request that the cluster manager decommission the specified executor. + * Default implementation delegates to kill, scheduler can override + * if it supports graceful decommissioning. + * + * @param executorId identifiers of executor to decommission + * @param decommissionInfo information about the decommission (reason, host loss) + * @return whether the request is acknowledged by the cluster manager. + */ + def decommissionExecutor(executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Boolean = { + val decommissionedExecutors = decommissionExecutors( + Seq((executorId, decommissionInfo)), + adjustTargetNumExecutors = true) + decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) + } + /** * Request that the cluster manager kill every executor on the specified host. diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 8cab9b16be07..13a9762a347a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -545,7 +545,9 @@ private[spark] class ExecutorAllocationManager( // get the running total as we remove or initialize it to the count - pendingRemoval val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId, (executorMonitor.executorCountWithResourceProfile(rpId) - - executorMonitor.pendingRemovalCountPerResourceProfileId(rpId))) + executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) - + executorMonitor.pendingDecommissioningPerResourceProfileId(rpId) + )) if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " + s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " + @@ -574,7 +576,7 @@ private[spark] class ExecutorAllocationManager( if (conf.get(WORKER_DECOMMISSION_ENABLED)) { val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( id => (id, ExecutorDecommissionInfo("spark scale down", false))) - client.decommissionExecutors(executorIdsWithoutHostLoss) + client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) } else { client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, countFailures = false, force = false) @@ -590,7 +592,11 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved.toSeq) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + executorMonitor.executorsDecommissioned(executorsRemoved) + } else { + executorMonitor.executorsKilled(executorsRemoved.toSeq) + } logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") executorsRemoved.toSeq } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f22225d3bce7..ce6d10517379 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -419,14 +419,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } - /** - * Mark given executors as decommissioned and stop making resource offers for it. - */ - private def decommissionExecutor(executorId: String, - decomInfo: ExecutorDecommissionInfo): Boolean = { - (! decommissionExecutors(List((executorId, decomInfo))).isEmpty) - } - /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -462,10 +454,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Request that the cluster manager decommission the specified executors. * * @param executors Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ override def decommissionExecutors( - executors: Seq[(String, ExecutorDecommissionInfo)]): Seq[String] = { + executors: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { val executorsToDecommission = executors.filter{case (executorId, _) => CoarseGrainedSchedulerBackend.this.synchronized { @@ -479,17 +474,40 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - executorsToDecommission.filter{case (executorId, decomInfo) => + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + executorsToDecommission.foreach { case (exec, _) => + val rpId = executorDataMap(exec).resourceProfileId + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } + + val decommissioned = executorsToDecommission.filter{case (executorId, decomInfo) => doDecommission(executorId, decomInfo) }.map(_._1) + decommissioned } + private def doDecommission(executorId: String, decomInfo: ExecutorDecommissionInfo): Boolean = { logInfo(s"Starting decommissioning executor $executorId.") try { scheduler.executorDecommission(executorId, decomInfo) + if (driverEndpoint != null) { + logInfo("Propagating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) + } } catch { case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) @@ -619,17 +637,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(RemoveWorker(workerId, host, message)) } - /** - * Called by subclasses when notified of a decommissioning executor. - */ - private[spark] def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) - } - } - def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 9e8562a9cc4e..3ed05a45b506 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } + .filter { case (_, exec) => + !exec.pendingRemoval && !exec.hasActiveShuffle && !exec.pendingDecommissioning} .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -150,6 +151,19 @@ private[spark] class ExecutorMonitor( nextTimeout.set(Long.MinValue) } + private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = { + ids.foreach { id => + val tracker = executors.get(id) + if (tracker != null) { + tracker.pendingDecommissioning = true + } + } + + // Recompute timed out executors in the next EAM callback, since this call invalidates + // the current list. + nextTimeout.set(Long.MinValue) + } + def executorCount: Int = executors.size() def executorCountWithResourceProfile(id: Int): Int = { @@ -172,6 +186,16 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size } + def pendingDecommissioningCount: Int = executors.asScala.count { case (_, exec) => + exec.pendingDecommissioning + } + + def pendingDecommissioningPerResourceProfileId(id: Int): Int = { + executors.asScala.filter { case (k, v) => + v.resourceProfileId == id && v.pendingDecommissioning + }.size + } + override def onJobStart(event: SparkListenerJobStart): Unit = { if (!shuffleTrackingEnabled) { return @@ -328,7 +352,7 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.pendingDecommissioning) { nextTimeout.set(Long.MinValue) } } @@ -431,6 +455,11 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet } + // Visible for testing + def executorsDecommissioning(): Set[String] = { + executors.asScala.filter { case (_, exec) => exec.pendingDecommissioning }.keys.toSet + } + /** * This method should be used when updating executor state. It guards against a race condition in * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` @@ -483,6 +512,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var pendingDecommissioning: Boolean = false var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ea6e010ef29a..c9c71cd57af2 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining) } + test("mock polling loop remove with decommissioning") { + val clock = new ManualClock(2020L) + val manager = createManager(createConf(1, 20, 1, true), clock = clock) + + // Remove idle executors on timeout + onExecutorAddedDefaultProfile(manager, "executor-1") + onExecutorAddedDefaultProfile(manager, "executor-2") + onExecutorAddedDefaultProfile(manager, "executor-3") + assert(executorsDecommissioning(manager).isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + + // idle threshold not reached yet + clock.advance(executorIdleTimeout * 1000 / 2) + schedule(manager) + assert(manager.executorMonitor.timedOutExecutors().isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + assert(executorsDecommissioning(manager).isEmpty) + + // idle threshold exceeded + clock.advance(executorIdleTimeout * 1000) + assert(manager.executorMonitor.timedOutExecutors().size === 3) + schedule(manager) + assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 executor remaining) + assert(executorsDecommissioning(manager).size === 2) // limit reached (1 executor remaining) + + // Mark a subset as busy - only idle executors should be removed + onExecutorAddedDefaultProfile(manager, "executor-4") + onExecutorAddedDefaultProfile(manager, "executor-5") + onExecutorAddedDefaultProfile(manager, "executor-6") + onExecutorAddedDefaultProfile(manager, "executor-7") + assert(manager.executorMonitor.executorCount === 7) + assert(executorsPendingToRemove(manager).isEmpty) // no pending to be removed + assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning + onExecutorBusy(manager, "executor-4") + onExecutorBusy(manager, "executor-5") + onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones) + + // after scheduling, the previously timed out executor should be removed, since + // there are new active ones. + schedule(manager) + assert(executorsDecommissioning(manager).size === 3) + + // advance the clock so that idle executors should time out and move to the pending list + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsPendingToRemove(manager).size === 0) + assert(executorsDecommissioning(manager).size === 4) + assert(!executorsDecommissioning(manager).contains("executor-4")) + assert(!executorsDecommissioning(manager).contains("executor-5")) + assert(!executorsDecommissioning(manager).contains("executor-6")) + + // Busy executors are now idle and should be removed + onExecutorIdle(manager, "executor-4") + onExecutorIdle(manager, "executor-5") + onExecutorIdle(manager, "executor-6") + schedule(manager) + assert(executorsDecommissioning(manager).size === 4) + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsDecommissioning(manager).size === 6) // limit reached (1 executor remaining) + } + test("listeners trigger add executors correctly") { val manager = createManager(createConf(1, 20, 1)) assert(addTime(manager) === NOT_SET) @@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createConf( minExecutors: Int = 1, maxExecutors: Int = 5, - initialExecutors: Int = 1): SparkConf = { + initialExecutors: Int = 1, + decommissioningEnabled: Boolean = false): SparkConf = { val sparkConf = new SparkConf() .set(config.DYN_ALLOCATION_ENABLED, true) .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) @@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 10000L) + .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled) sparkConf } @@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def executorsPendingToRemove(manager: ExecutorAllocationManager): Set[String] = { manager.executorMonitor.executorsPendingToRemove() } + + private def executorsDecommissioning(manager: ExecutorAllocationManager): Set[String] = { + manager.executorMonitor.executorsDecommissioning() + } } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 58bd56c591d0..2d0cd178e914 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -23,7 +23,9 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -133,7 +135,12 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - client.killExecutor(execIdToRemove) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + client.decommissionExecutor(execIdToRemove, + ExecutorDecommissionInfo("spark scale down", false)) + } else { + client.killExecutor(execIdToRemove) + } logInfo(s"Requested to kill executor $execIdToRemove") } else { logInfo(s"No non-receiver executors to kill") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 2e7051ba3c56..57fd97179bf4 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -27,6 +27,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} @@ -55,9 +56,12 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly + conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning) // There is 1 receiver, and exec 1 has been allocated to it - withAllocationManager(numReceivers = 1) { case (receiverTracker, allocationManager) => + withAllocationManager(numReceivers = 1, conf = conf) { + case (receiverTracker, allocationManager) => + when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1"))) /** Add data point for batch processing time and verify executor allocation */ @@ -99,13 +103,14 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase val expectedWithInfo = expectedKilledExec.map{ exec => (exec, ExecutorDecommissionInfo("spark scale down", false)) } - verify(allocationClient, times(1)).decommissionExecutors(meq(expectedWithInfo.toSeq)) + verify(allocationClient, times(1)).decommissionExecutors( + meq(expectedWithInfo.toSeq), meq(false)) } else { verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) } } else { if (decommissioning) { - verify(allocationClient, never).decommissionExecutors(null) + verify(allocationClient, never).decommissionExecutors(null, false) } else { verify(allocationClient, never).killExecutor(null) } From efbe9a3a3c8326f79a1c0f94a3bfb33d3786c099 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 25 Jun 2020 15:48:24 -0700 Subject: [PATCH 04/10] Fix our exiting and cleanup thread for better debugging next time. Cleanup the locks we use in decommissioning and clarify some more bits. --- .../CoarseGrainedExecutorBackend.scala | 56 ++++++++++--------- .../CoarseGrainedSchedulerBackend.scala | 16 +++--- .../spark/storage/BlockManagerMaster.scala | 7 ++- 3 files changed, 44 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 0072832bd1d4..9965e40f43e5 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -291,39 +291,43 @@ private[spark] class CoarseGrainedExecutorBackend( // is viewed as acceptable to minimize introduction of any new locking structures in critical // code paths. - val shutdownThread = new Thread() { - var lastTaskRunningTime = System.nanoTime() - val sleep_time = 1000 // 1s - - while (true) { - logInfo("Checking to see if we can shutdown.") - if (executor == null || executor.numRunningTasks == 0) { - if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { - logInfo("No running tasks, checking migrations") - val allBlocksMigrated = env.blockManager.lastMigrationInfo() - // We can only trust allBlocksMigrated boolean value if there were no tasks running - // since the start of computing it. - if (allBlocksMigrated._2 && - (allBlocksMigrated._1 > lastTaskRunningTime)) { - logInfo("No running tasks, all blocks migrated, stopping.") - exitExecutor(0, "Finished decommissioning", notifyDriver = true) + val shutdownExec = ThreadUtils.newDaemonSingleThreadExecutor("wait for decommissioning") + val shutdownRunnable = new Runnable() { + override def run(): Unit = { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val allBlocksMigrated = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated._2 && + (allBlocksMigrated._1 > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } } else { - logInfo("All blocks not yet migrated.") + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) } + Thread.sleep(sleep_time) } else { - logInfo("No running tasks, no block migration configured, stopping.") - exitExecutor(0, "Finished decommissioning", notifyDriver = true) + logInfo("Blocked from shutdown by running task") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + Thread.sleep(sleep_time) + lastTaskRunningTime = System.nanoTime() } - Thread.sleep(sleep_time) - } else { - logInfo("Blocked from shutdown by running task") - // If there is a running task it could store blocks, so make sure we wait for a - // migration loop to complete after the last task is done. - Thread.sleep(sleep_time) - lastTaskRunningTime = System.nanoTime() } } } + shutdownExec.submit(shutdownRunnable) logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ce6d10517379..1deb182f78bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -515,13 +515,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // Send decommission message to the executor (it could have originated on the executor // but not necessarily. - executorDataMap.get(executorId) match { - case Some(executorInfo) => - executorInfo.executorEndpoint.send(DecommissionSelf) - case None => - // Ignoring the executor since it is not registered. - logWarning(s"Attempted to decommission unknown executor $executorId.") - return false + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } } logInfo(s"Finished decommissioning executor $executorId.") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 93492cc6d7db..35163bd402f1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,9 +43,12 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Decommission block managers corresponding to given set of executors */ + /** Decommission block managers corresponding to given set of executors + * Non-blocking. + */ def decommissionBlockManagers(executorIds: Seq[String]): Unit = { - driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds)) + logInfo(s"Decommissioning block managers on ${executorIds}") } /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ From ccbef6b4816fb85a67745f01a7a6519af79cef6e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 14:54:24 -0700 Subject: [PATCH 05/10] Verify executors decommissioned, then killed by external external cluster manager are re-launched --- .../src/main/dockerfiles/spark/decom.sh | 2 +- .../k8s/integrationtest/KubernetesSuite.scala | 25 ++++++++----------- .../tests/decommissioning.py | 5 ---- 3 files changed, 11 insertions(+), 21 deletions(-) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 8a5208d49a70..cd973df257f0 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date -sleep 30 +sleep 1 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index ebf71e8cb83e..b230fbc3384c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -278,6 +278,7 @@ class KubernetesSuite extends SparkFunSuite appArgs = appArgs) val execPods = scala.collection.mutable.Map[String, Pod]() + val podsDeleted = scala.collection.mutable.HashSet[String]() val (patienceInterval, patienceTimeout) = { executorPatience match { case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) @@ -338,27 +339,21 @@ class KubernetesSuite extends SparkFunSuite } // Delete the pod to simulate cluster scale down/migration. // This will allow the pod to remain up for the grace period - val pod = kubernetesTestComponents.kubernetesClient.pods() - .withName(name) - pod.delete() + kubernetesTestComponents.kubernetesClient.pods() + .withName(name).delete() logDebug(s"Triggered pod decom/delete: $name deleted") - // Look for the string that indicates we should force kill the first - // Executor. This simulates the pod being fully lost. - logDebug("Waiting for second collect...") + // Make sure this pod is deleted Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPodName) - .getLog - .contains("Waiting some more, please kill exec 1."), - "Decommission test did not complete second collect.") + assert(podsDeleted.contains(name)) + } + // Then make sure this pod is replaced + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(execPods.size == 3) } - logDebug("Force deleting") - val podNoGrace = pod.withGracePeriod(0) - podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) + podsDeleted += name } } }) diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index d34e61611461..5fcad083b007 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -47,11 +47,6 @@ def addToAcc(x): print("...") time.sleep(30) rdd.count() - print("Waiting some more, please kill exec 1.") - print("...") - time.sleep(30) - print("Executor node should be deleted now") - rdd.count() rdd.collect() print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") From b69784db716f83c4aa1b6593a7eb773dfbde82f6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 14:55:28 -0700 Subject: [PATCH 06/10] Verify some additional calls are not occuring in the executor allocation manager suite. --- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 1 - .../streaming/scheduler/ExecutorAllocationManagerSuite.scala | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 00e0a4372e35..2b697f9f5116 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -221,6 +221,5 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // should have same value like before assert(testRdd.count() === numParts) assert(accum.value === numParts) - } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 57fd97179bf4..9cb1dba882b6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -105,12 +105,15 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } verify(allocationClient, times(1)).decommissionExecutors( meq(expectedWithInfo.toSeq), meq(false)) + verify(allocationClient, never).killExecutor(meq(expectedKilledExec.get)) } else { verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + verify(allocationClient, never).decommissionExecutor(meq(expectedKilledExec.get)) } } else { if (decommissioning) { verify(allocationClient, never).decommissionExecutors(null, false) + verify(allocationClient, never).decommissionExecutor(null) } else { verify(allocationClient, never).killExecutor(null) } From d4961d9e6767b2be85977badb5f2bdeed53191e6 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 15:20:37 -0700 Subject: [PATCH 07/10] Dont' close the watcher until the end of the test --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index b230fbc3384c..56943a385097 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -382,7 +382,6 @@ class KubernetesSuite extends SparkFunSuite Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } - execWatcher.close() execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => @@ -394,6 +393,7 @@ class KubernetesSuite extends SparkFunSuite s"The application did not complete, did not find str ${e}") } } + execWatcher.close() } protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { From d5c5ef1666091769206cca15e266eed79b0e75cc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 15:23:39 -0700 Subject: [PATCH 08/10] Use decommissionExecutors and set adjustTargetNumExecutors to false so that we can match the pattern for killExecutor/killExecutors --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 ++++-- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 5 +++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1deb182f78bd..3cc1fd90d3bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -193,7 +193,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: $decommissionInfo") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutors(Seq(executorId, decommissionInfo), + adjustTargetNumExecutors = false) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -274,7 +275,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutors(Seq(executorId, decommissionInfo), + adjustTargetNumExecutors = false)) context.reply(true) case RetrieveSparkAppConfig(resourceProfileId) => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d921af602b25..091085369285 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -176,8 +176,9 @@ private[spark] class StandaloneSchedulerBackend( override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1), decommissionInfo) - logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) + val execId = fullId.split("/")(1) + decommissionExecutors(Seq((execId, decommissionInfo), adjustTargetNumExecutors = false) + logInfo("Executor %s decommissioned: %s".format(fullId, message)) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { From 683c83c9f99e8ba8a7d2eeff381042cc1910e433 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 21 Jul 2020 19:18:42 -0700 Subject: [PATCH 09/10] bump numparts up to 6 --- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 2b697f9f5116..cfbd8d8c78d5 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -34,7 +34,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS with ResetSystemProperties with Eventually { val numExecs = 3 - val numParts = 3 + val numParts = 6 test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") { From f921ddd3913d8b749abd2d8de645e5d098d73823 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 23 Jul 2020 11:21:19 -0700 Subject: [PATCH 10/10] Revert "bump numparts up to 6" This reverts commit daf96ddae8d1cc2440b8b5452a1cd7c3e499f278. --- .../storage/BlockManagerDecommissionIntegrationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index cfbd8d8c78d5..2b697f9f5116 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -34,7 +34,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS with ResetSystemProperties with Eventually { val numExecs = 3 - val numParts = 6 + val numParts = 3 test(s"verify that an already running task which is going to cache data succeeds " + s"on a decommissioned executor") {