diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 00bd0063c9e3..740831c2b49b 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -17,6 +17,7 @@ package org.apache.spark +import org.apache.spark.scheduler.ExecutorDecommissionInfo /** * A client that communicates with the cluster manager to request or kill executors. * This is currently supported only in YARN mode. @@ -81,6 +82,43 @@ private[spark] trait ExecutorAllocationClient { countFailures: Boolean, force: Boolean = false): Seq[String] + /** + * Request that the cluster manager decommission the specified executors. + * Default implementation delegates to kill, scheduler must override + * if it supports graceful decommissioning. + * + * @param executors identifiers of executors & decom info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + def decommissionExecutors( + executors: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + killExecutors(executors.map(_._1), + adjustTargetNumExecutors, + countFailures = false) + } + + + /** + * Request that the cluster manager decommission the specified executor. + * Default implementation delegates to kill, scheduler can override + * if it supports graceful decommissioning. + * + * @param executorId identifiers of executor to decommission + * @param decommissionInfo information about the decommission (reason, host loss) + * @return whether the request is acknowledged by the cluster manager. + */ + def decommissionExecutor(executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Boolean = { + val decommissionedExecutors = decommissionExecutors( + Seq((executorId, decommissionInfo)), + adjustTargetNumExecutors = true) + decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId) + } + + /** * Request that the cluster manager kill every executor on the specified host. * diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 85409d599cca..13a9762a347a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.source.Source import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.resource.ResourceProfileManager @@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager( s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) { - if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) { + // If dynamic allocation shuffle tracking or worker decommissioning along with + // storage shuffle decommissioning is enabled we have *experimental* support for + // decommissioning without a shuffle service. + if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || + (conf.get(WORKER_DECOMMISSION_ENABLED) && + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { logWarning("Dynamic allocation without a shuffle service is an experimental feature.") } else if (!testing) { throw new SparkException("Dynamic allocation of executors requires the external " + @@ -539,7 +545,9 @@ private[spark] class ExecutorAllocationManager( // get the running total as we remove or initialize it to the count - pendingRemoval val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId, (executorMonitor.executorCountWithResourceProfile(rpId) - - executorMonitor.pendingRemovalCountPerResourceProfileId(rpId))) + executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) - + executorMonitor.pendingDecommissioningPerResourceProfileId(rpId) + )) if (newExecutorTotal - 1 < minNumExecutors) { logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " + s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " + @@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager( } else { // We don't want to change our target number of executors, because we already did that // when the task backlog decreased. - client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, - countFailures = false, force = false) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( + id => (id, ExecutorDecommissionInfo("spark scale down", false))) + client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) + } else { + client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, + countFailures = false, force = false) + } } // [SPARK-21834] killExecutors api reduces the target number of executors. @@ -578,7 +592,11 @@ private[spark] class ExecutorAllocationManager( // reset the newExecutorTotal to the existing number of executors if (testing || executorsRemoved.nonEmpty) { - executorMonitor.executorsKilled(executorsRemoved.toSeq) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + executorMonitor.executorsDecommissioned(executorsRemoved) + } else { + executorMonitor.executorsKilled(executorsRemoved.toSeq) + } logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.") executorsRemoved.toSeq } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c8c6e5a192a2..b7a64d75a8d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -165,8 +165,6 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. - // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa8c46fc6831..862e685c2dce 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionSelf => + case WorkerDecommission(_, _) => decommissionSelf() } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index def125bb6bfb..9965e40f43e5 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null - @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -80,6 +79,9 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] + // Track our decommissioning status internally. + @volatile private var decommissioned = false + override def onStart(): Unit = { logInfo("Registering PWR handler.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + @@ -214,6 +216,10 @@ private[spark] class CoarseGrainedExecutorBackend( case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + + case DecommissionSelf => + logInfo("Received decommission self") + decommissionSelf() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -277,12 +283,57 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor != null) { executor.decommission() } - logInfo("Done decommissioning self.") + // Shutdown the executor once all tasks are gone & any configured migrations completed. + // Detecting migrations completion doesn't need to be perfect and we want to minimize the + // overhead for executors that are not in decommissioning state as overall that will be + // more of the executors. For example, this will not catch a block which is already in + // the process of being put from a remote executor before migration starts. This trade-off + // is viewed as acceptable to minimize introduction of any new locking structures in critical + // code paths. + + val shutdownExec = ThreadUtils.newDaemonSingleThreadExecutor("wait for decommissioning") + val shutdownRunnable = new Runnable() { + override def run(): Unit = { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val allBlocksMigrated = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated._2 && + (allBlocksMigrated._1 > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } + } else { + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } + Thread.sleep(sleep_time) + } else { + logInfo("Blocked from shutdown by running task") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + Thread.sleep(sleep_time) + lastTaskRunningTime = System.nanoTime() + } + } + } + } + shutdownExec.submit(shutdownRunnable) + logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true } catch { case e: Exception => - logError(s"Error ${e} during attempt to decommission self") + logError("Unexpected error while decommissioning self", e) false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 91485f01bf00..49b859817fb3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage + + // Used to ask an executor to decommission it's self. + case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8fbefae58af1..3cc1fd90d3bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -193,7 +193,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: $decommissionInfo") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutors(Seq(executorId, decommissionInfo), + adjustTargetNumExecutors = false) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -274,7 +275,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case DecommissionExecutor(executorId, decommissionInfo) => logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") - decommissionExecutor(executorId, decommissionInfo) + decommissionExecutors(Seq(executorId, decommissionInfo), + adjustTargetNumExecutors = false)) context.reply(true) case RetrieveSparkAppConfig(resourceProfileId) => @@ -419,49 +421,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.workerRemoved(workerId, host, message) } - /** - * Mark a given executor as decommissioned and stop making resource offers for it. - */ - private def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { - val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { - // Only bother decommissioning executors which are alive. - if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId - true - } else { - false - } - } - - if (shouldDisable) { - logInfo(s"Starting decommissioning executor $executorId.") - try { - scheduler.executorDecommission(executorId, decommissionInfo) - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - } - logInfo(s"Finished decommissioning executor $executorId.") - - if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { - try { - logInfo("Starting decommissioning block manager corresponding to " + - s"executor $executorId.") - scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) - } catch { - case e: Exception => - logError("Unexpected error during block manager " + - s"decommissioning for executor $executorId: ${e.toString}", e) - } - logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") - } - } else { - logInfo(s"Skipping decommissioning of executor $executorId.") - } - shouldDisable - } - /** * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. @@ -493,6 +452,100 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio + /** + * Request that the cluster manager decommission the specified executors. + * + * @param executors Identifiers of executors & decommission info. + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been decommissioned. + * @return the ids of the executors acknowledged by the cluster manager to be removed. + */ + override def decommissionExecutors( + executors: Seq[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean): Seq[String] = { + + val executorsToDecommission = executors.filter{case (executorId, _) => + CoarseGrainedSchedulerBackend.this.synchronized { + // Only bother decommissioning executors which are alive. + if (isExecutorActive(executorId)) { + executorsPendingDecommission += executorId + true + } else { + false + } + } + } + + // If we don't want to replace the executors we are decommissioning + if (adjustTargetNumExecutors) { + executorsToDecommission.foreach { case (exec, _) => + val rpId = executorDataMap(exec).resourceProfileId + val rp = scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId) + if (requestedTotalExecutorsPerResourceProfile.isEmpty) { + // Assume that we are killing an executor that was started by default and + // not through the request api + requestedTotalExecutorsPerResourceProfile(rp) = 0 + } else { + val requestedTotalForRp = requestedTotalExecutorsPerResourceProfile(rp) + requestedTotalExecutorsPerResourceProfile(rp) = math.max(requestedTotalForRp - 1, 0) + } + } + doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap) + } + + val decommissioned = executorsToDecommission.filter{case (executorId, decomInfo) => + doDecommission(executorId, decomInfo) + }.map(_._1) + decommissioned + } + + + private def doDecommission(executorId: String, + decomInfo: ExecutorDecommissionInfo): Boolean = { + + logInfo(s"Starting decommissioning executor $executorId.") + try { + scheduler.executorDecommission(executorId, decomInfo) + if (driverEndpoint != null) { + logInfo("Propagating executor decommission to driver.") + driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) + } + } catch { + case e: Exception => + logError(s"Unexpected error during decommissioning ${e.toString}", e) + return false + } + // Send decommission message to the executor (it could have originated on the executor + // but not necessarily. + CoarseGrainedSchedulerBackend.this.synchronized { + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + return false + } + } + logInfo(s"Finished decommissioning executor $executorId.") + + if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { + try { + logInfo("Starting decommissioning block manager corresponding to " + + s"executor $executorId.") + scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId)) + } catch { + case e: Exception => + logError("Unexpected error during block manager " + + s"decommissioning for executor $executorId: ${e.toString}", e) + return false + } + logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.") + } + true + } + + override def start(): Unit = { if (UserGroupInformation.isSecurityEnabled()) { delegationTokenManager = createTokenManager() @@ -588,17 +641,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp driverEndpoint.send(RemoveWorker(workerId, host, message)) } - /** - * Called by subclasses when notified of a decommissioning executor. - */ - private[spark] def decommissionExecutor( - executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) - } - } - def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d921af602b25..091085369285 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -176,8 +176,9 @@ private[spark] class StandaloneSchedulerBackend( override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1), decommissionInfo) - logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) + val execId = fullId.split("/")(1) + decommissionExecutors(Seq((execId, decommissionInfo), adjustTargetNumExecutors = false) + logInfo("Executor %s decommissioned: %s".format(fullId, message)) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 4d7190726f06..3ed05a45b506 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID import org.apache.spark.scheduler._ -import org.apache.spark.storage.RDDBlockId +import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId} import org.apache.spark.util.Clock /** @@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor( var newNextTimeout = Long.MaxValue timedOutExecs = executors.asScala - .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle } + .filter { case (_, exec) => + !exec.pendingRemoval && !exec.hasActiveShuffle && !exec.pendingDecommissioning} .filter { case (_, exec) => val deadline = exec.timeoutAt if (deadline > now) { @@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor( /** * Mark the given executors as pending to be removed. Should only be called in the EAM thread. + * This covers both kills and decommissions. */ def executorsKilled(ids: Seq[String]): Unit = { ids.foreach { id => @@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor( nextTimeout.set(Long.MinValue) } + private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = { + ids.foreach { id => + val tracker = executors.get(id) + if (tracker != null) { + tracker.pendingDecommissioning = true + } + } + + // Recompute timed out executors in the next EAM callback, since this call invalidates + // the current list. + nextTimeout.set(Long.MinValue) + } + def executorCount: Int = executors.size() def executorCountWithResourceProfile(id: Int): Int = { @@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (k, v) => v.resourceProfileId == id && v.pendingRemoval }.size } + def pendingDecommissioningCount: Int = executors.asScala.count { case (_, exec) => + exec.pendingDecommissioning + } + + def pendingDecommissioningPerResourceProfileId(id: Int): Int = { + executors.asScala.filter { case (k, v) => + v.resourceProfileId == id && v.pendingDecommissioning + }.size + } + override def onJobStart(event: SparkListenerJobStart): Unit = { if (!shuffleTrackingEnabled) { return @@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor( // // This means that an executor may be marked as having shuffle data, and thus prevented // from being removed, even though the data may not be used. + // TODO: Only track used files (SPARK-31974) if (shuffleTrackingEnabled && event.reason == Success) { stageToShuffleID.get(event.stageId).foreach { shuffleId => exec.addShuffle(shuffleId) @@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor( val removed = executors.remove(event.executorId) if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) - if (!removed.pendingRemoval) { + if (!removed.pendingRemoval || !removed.pendingDecommissioning) { nextTimeout.set(Long.MinValue) } } } override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = { - if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { - return - } val exec = ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId, UNKNOWN_RESOURCE_PROFILE_ID) + + // Check if it is a shuffle file, or RDD to pick the correct codepath for update + if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && shuffleTrackingEnabled) { + /** + * The executor monitor keeps track of locations of cache and shuffle blocks and this can be + * used to decide which executor(s) Spark should shutdown first. Since we move shuffle blocks + * around now this wires it up so that it keeps track of it. We only do this for data blocks + * as index and other blocks blocks do not necessarily mean the entire block has been + * committed. + */ + event.blockUpdatedInfo.blockId match { + case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId) + case _ => // For now we only update on data blocks + } + return + } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) { + return + } val storageLevel = event.blockUpdatedInfo.storageLevel val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId] @@ -414,6 +455,11 @@ private[spark] class ExecutorMonitor( executors.asScala.filter { case (_, exec) => exec.pendingRemoval }.keys.toSet } + // Visible for testing + def executorsDecommissioning(): Set[String] = { + executors.asScala.filter { case (_, exec) => exec.pendingDecommissioning }.keys.toSet + } + /** * This method should be used when updating executor state. It guards against a race condition in * which the `SparkListenerTaskStart` event is posted before the `SparkListenerBlockManagerAdded` @@ -466,6 +512,7 @@ private[spark] class ExecutorMonitor( @volatile var timedOut: Boolean = false var pendingRemoval: Boolean = false + var pendingDecommissioning: Boolean = false var hasActiveShuffle: Boolean = false private var idleStart: Long = -1 diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47af854b6e8f..56e64ef8a765 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,6 +1822,14 @@ private[spark] class BlockManager( } } + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[spark] def lastMigrationInfo(): (Long, Boolean) = { + decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) + } + private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = master.getReplicateInfoForRDDBlocks(blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 1cc7ef6a25f9..00cb22e56bf5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -41,6 +42,13 @@ private[storage] class BlockManagerDecommissioner( private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + // Used for tracking if our migrations are complete. + @volatile private var lastRDDMigrationTime: Long = 0 + @volatile private var lastShuffleMigrationTime: Long = 0 + @volatile private var rddBlocksLeft: Boolean = true + @volatile private var shuffleBlocksLeft: Boolean = true + + /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a * producer/consumer where the main migration loop updates the queue of blocks to be migrated @@ -91,10 +99,12 @@ private[storage] class BlockManagerDecommissioner( null)// class tag, we don't need for shuffle logDebug(s"Migrated sub block ${blockId}") } - logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") + logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}") } else { logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") } + logInfo(s"Migrated ${shuffleBlockInfo}") + numMigratedShuffles.incrementAndGet() } } // This catch is intentionally outside of the while running block. @@ -115,12 +125,21 @@ private[storage] class BlockManagerDecommissioner( // Shuffles which are either in queue for migrations or migrated private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() + // Shuffles which have migrated. This used to know when we are "done", being done can change + // if a new shuffle file is created by a running task. + private val numMigratedShuffles = new AtomicInteger(0) + + + // Shuffles which are queued for migration & number of retries so far. + // Visible in storage for testing. private[storage] val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]() // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false + @volatile private var stoppedRDD = false + @volatile private var stoppedShuffle = false private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() @@ -133,22 +152,24 @@ private[storage] class BlockManagerDecommissioner( override def run(): Unit = { assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedRDD && !Thread.interrupted()) { logInfo("Iterating on migrating from the block manager.") try { + val startTime = System.nanoTime() logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() + rddBlocksLeft = decommissionRddCacheBlocks() + lastRDDMigrationTime = startTime logInfo("Attempt to replicate all cached blocks done") logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => - logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + logInfo("Interrupted during RDD migration, stopping") + stoppedRDD = true case NonFatal(e) => - logError("Error occurred while trying to replicate for block manager decommissioning.", + logError("Error occurred replicating RDD for block manager decommissioning.", e) - stopped = true + stoppedRDD = true } } } @@ -162,20 +183,22 @@ private[storage] class BlockManagerDecommissioner( override def run() { assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedShuffle && !Thread.interrupted()) { try { logDebug("Attempting to replicate all shuffle blocks") - refreshOffloadingShuffleBlocks() + val startTime = System.nanoTime() + shuffleBlocksLeft = refreshOffloadingShuffleBlocks() + lastShuffleMigrationTime = startTime logInfo("Done starting workers to migrate shuffle blocks") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + stoppedShuffle = true case NonFatal(e) => logError("Error occurred while trying to replicate for block manager decommissioning.", e) - stopped = true + stoppedShuffle = true } } } @@ -191,8 +214,9 @@ private[storage] class BlockManagerDecommissioner( * but rather shadows them. * Requires an Indexed based shuffle resolver. * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. + * Returns true if we are not done migrating shuffle blocks. */ - private[storage] def refreshOffloadingShuffleBlocks(): Unit = { + private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { // Update the queue of shuffles to be migrated logInfo("Offloading shuffle blocks") val localShuffles = bm.migratableResolver.getStoredShuffles().toSet @@ -215,6 +239,8 @@ private[storage] class BlockManagerDecommissioner( deadPeers.foreach { peer => migrationPeers.get(peer).foreach(_.running = false) } + // If we found any new shuffles to migrate or otherwise have not migrated everything. + newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() } /** @@ -231,8 +257,9 @@ private[storage] class BlockManagerDecommissioner( /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing + * Returns true if we have not migrated all of our RDD blocks. */ - private[storage] def decommissionRddCacheBlocks(): Unit = { + private[storage] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = bm.getMigratableRDDBlocks() if (replicateBlocksInfo.nonEmpty) { @@ -240,7 +267,7 @@ private[storage] class BlockManagerDecommissioner( "for block manager decommissioning") } else { logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") - return + return false } // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) @@ -252,7 +279,9 @@ private[storage] class BlockManagerDecommissioner( if (blocksFailedReplication.nonEmpty) { logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") + return true } + return false } private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { @@ -275,9 +304,13 @@ private[storage] class BlockManagerDecommissioner( logInfo("Starting block migration thread") if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable) + } else { + stoppedRDD = true } if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable) + } else { + stoppedShuffle = true } if (!conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { @@ -327,4 +360,28 @@ private[storage] class BlockManagerDecommissioner( } logInfo("Stopped storage decommissioner") } + + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[storage] def lastMigrationInfo(): (Long, Boolean) = { + if (stopped || (stoppedRDD && stoppedShuffle)) { + (System.nanoTime(), true) + } else { + + val lastMigrationTime = if ( + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) && + conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) { + Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) + } else if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) { + lastShuffleMigrationTime + } else { + lastRDDMigrationTime + } + + val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) + (lastMigrationTime, blocksMigrated) + } + } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 93492cc6d7db..35163bd402f1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -43,9 +43,12 @@ class BlockManagerMaster( logInfo("Removed " + execId + " successfully in removeExecutor") } - /** Decommission block managers corresponding to given set of executors */ + /** Decommission block managers corresponding to given set of executors + * Non-blocking. + */ def decommissionBlockManagers(executorIds: Seq[String]): Unit = { - driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds)) + driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds)) + logInfo(s"Decommissioning block managers on ${executorIds}") } /** Get Replication Info for all the RDD blocks stored in given blockManagerId */ diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index ea6e010ef29a..c9c71cd57af2 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.metrics.MetricsSystem import org.apache.spark.resource._ import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 executor remaining) } + test("mock polling loop remove with decommissioning") { + val clock = new ManualClock(2020L) + val manager = createManager(createConf(1, 20, 1, true), clock = clock) + + // Remove idle executors on timeout + onExecutorAddedDefaultProfile(manager, "executor-1") + onExecutorAddedDefaultProfile(manager, "executor-2") + onExecutorAddedDefaultProfile(manager, "executor-3") + assert(executorsDecommissioning(manager).isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + + // idle threshold not reached yet + clock.advance(executorIdleTimeout * 1000 / 2) + schedule(manager) + assert(manager.executorMonitor.timedOutExecutors().isEmpty) + assert(executorsPendingToRemove(manager).isEmpty) + assert(executorsDecommissioning(manager).isEmpty) + + // idle threshold exceeded + clock.advance(executorIdleTimeout * 1000) + assert(manager.executorMonitor.timedOutExecutors().size === 3) + schedule(manager) + assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 executor remaining) + assert(executorsDecommissioning(manager).size === 2) // limit reached (1 executor remaining) + + // Mark a subset as busy - only idle executors should be removed + onExecutorAddedDefaultProfile(manager, "executor-4") + onExecutorAddedDefaultProfile(manager, "executor-5") + onExecutorAddedDefaultProfile(manager, "executor-6") + onExecutorAddedDefaultProfile(manager, "executor-7") + assert(manager.executorMonitor.executorCount === 7) + assert(executorsPendingToRemove(manager).isEmpty) // no pending to be removed + assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning + onExecutorBusy(manager, "executor-4") + onExecutorBusy(manager, "executor-5") + onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 active ones) + + // after scheduling, the previously timed out executor should be removed, since + // there are new active ones. + schedule(manager) + assert(executorsDecommissioning(manager).size === 3) + + // advance the clock so that idle executors should time out and move to the pending list + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsPendingToRemove(manager).size === 0) + assert(executorsDecommissioning(manager).size === 4) + assert(!executorsDecommissioning(manager).contains("executor-4")) + assert(!executorsDecommissioning(manager).contains("executor-5")) + assert(!executorsDecommissioning(manager).contains("executor-6")) + + // Busy executors are now idle and should be removed + onExecutorIdle(manager, "executor-4") + onExecutorIdle(manager, "executor-5") + onExecutorIdle(manager, "executor-6") + schedule(manager) + assert(executorsDecommissioning(manager).size === 4) + clock.advance(executorIdleTimeout * 1000) + schedule(manager) + assert(executorsDecommissioning(manager).size === 6) // limit reached (1 executor remaining) + } + test("listeners trigger add executors correctly") { val manager = createManager(createConf(1, 20, 1)) assert(addTime(manager) === NOT_SET) @@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def createConf( minExecutors: Int = 1, maxExecutors: Int = 5, - initialExecutors: Int = 1): SparkConf = { + initialExecutors: Int = 1, + decommissioningEnabled: Boolean = false): SparkConf = { val sparkConf = new SparkConf() .set(config.DYN_ALLOCATION_ENABLED, true) .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) @@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 10000L) + .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled) sparkConf } @@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { private def executorsPendingToRemove(manager: ExecutorAllocationManager): Set[String] = { manager.executorMonitor.executorsPendingToRemove() } + + private def executorsDecommissioning(manager: ExecutorAllocationManager): Set[String] = { + manager.executorMonitor.executorsDecommissioning() + } } /** diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 57410103dd08..2b697f9f5116 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) // Just replicate blocks as fast as we can during testing, there isn't another // workload we need to worry about. - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) sc = new SparkContext(master, "test", conf) @@ -213,10 +213,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } - // Make the executor we decommissioned exit - sched.client.killExecutors(List(execToDecommission)) - - // Wait for the executor to be removed + // Wait for the executor to be removed automatically after migration. executorRemovedSem.acquire(1) // Since the RDD is cached or shuffled so further usage of same RDD should use the @@ -224,6 +221,5 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // should have same value like before assert(testRdd.count() === numParts) assert(accum.value === numParts) - } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 5ff1ff05cc4e..627b49d1d5bc 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{atLeast => least, mock, times, verify, when} import org.scalatest._ import org.scalatest.concurrent.Eventually._ @@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { private val sparkConf = new SparkConf(false) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true) + // Just replicate blocks as fast as we can during testing, there isn't another + // workload we need to worry about. + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) private def registerShuffleBlocks( mockMigratableShuffleResolver: MigratableResolver, @@ -77,9 +80,10 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() - eventually(timeout(5.second), interval(10.milliseconds)) { + // We don't check that all blocks are migrated because out mock is always returning an RDD. + eventually(timeout(10.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) - verify(bm, times(1)).replicateBlock( + verify(bm, least(1)).replicateBlock( mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), @@ -88,5 +92,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } finally { bmDecomManager.stop() } + + bmDecomManager.stop() } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh index 8a5208d49a70..cd973df257f0 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh @@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null date echo "Done" date -sleep 30 +sleep 1 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index ebf71e8cb83e..56943a385097 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -278,6 +278,7 @@ class KubernetesSuite extends SparkFunSuite appArgs = appArgs) val execPods = scala.collection.mutable.Map[String, Pod]() + val podsDeleted = scala.collection.mutable.HashSet[String]() val (patienceInterval, patienceTimeout) = { executorPatience match { case Some(patience) => (patience._1.getOrElse(INTERVAL), patience._2.getOrElse(TIMEOUT)) @@ -338,27 +339,21 @@ class KubernetesSuite extends SparkFunSuite } // Delete the pod to simulate cluster scale down/migration. // This will allow the pod to remain up for the grace period - val pod = kubernetesTestComponents.kubernetesClient.pods() - .withName(name) - pod.delete() + kubernetesTestComponents.kubernetesClient.pods() + .withName(name).delete() logDebug(s"Triggered pod decom/delete: $name deleted") - // Look for the string that indicates we should force kill the first - // Executor. This simulates the pod being fully lost. - logDebug("Waiting for second collect...") + // Make sure this pod is deleted Eventually.eventually(TIMEOUT, INTERVAL) { - assert(kubernetesTestComponents.kubernetesClient - .pods() - .withName(driverPodName) - .getLog - .contains("Waiting some more, please kill exec 1."), - "Decommission test did not complete second collect.") + assert(podsDeleted.contains(name)) + } + // Then make sure this pod is replaced + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(execPods.size == 3) } - logDebug("Force deleting") - val podNoGrace = pod.withGracePeriod(0) - podNoGrace.delete() } case Action.DELETED | Action.ERROR => execPods.remove(name) + podsDeleted += name } } }) @@ -387,7 +382,6 @@ class KubernetesSuite extends SparkFunSuite Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) } - execWatcher.close() execPods.values.foreach(executorPodChecker(_)) Eventually.eventually(patienceTimeout, patienceInterval) { expectedLogOnCompletion.foreach { e => @@ -399,6 +393,7 @@ class KubernetesSuite extends SparkFunSuite s"The application did not complete, did not find str ${e}") } } + execWatcher.close() } protected def doBasicDriverPodCheck(driverPod: Pod): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py index d34e61611461..5fcad083b007 100644 --- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py +++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py @@ -47,11 +47,6 @@ def addToAcc(x): print("...") time.sleep(30) rdd.count() - print("Waiting some more, please kill exec 1.") - print("...") - time.sleep(30) - print("Executor node should be deleted now") - rdd.count() rdd.collect() print("Final accumulator value is: " + str(acc.value)) print("Finished waiting, stopping Spark.") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 58bd56c591d0..2d0cd178e914 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -23,7 +23,9 @@ import scala.util.Random import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.util.RecurringTimer import org.apache.spark.util.{Clock, Utils} @@ -133,7 +135,12 @@ private[streaming] class ExecutorAllocationManager( logDebug(s"Removable executors (${removableExecIds.size}): ${removableExecIds}") if (removableExecIds.nonEmpty) { val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) - client.killExecutor(execIdToRemove) + if (conf.get(WORKER_DECOMMISSION_ENABLED)) { + client.decommissionExecutor(execIdToRemove, + ExecutorDecommissionInfo("spark scale down", false)) + } else { + client.killExecutor(execIdToRemove) + } logInfo(s"Requested to kill executor $execIdToRemove") } else { logInfo(s"No non-receiver executors to kill") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 65efa10bfcf9..9cb1dba882b6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -27,7 +27,9 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{ExecutorAllocationClient, SparkConf} import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.internal.config.Streaming._ +import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED import org.apache.spark.resource.ResourceProfile +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext, TestSuiteBase} import org.apache.spark.util.{ManualClock, Utils} @@ -44,11 +46,22 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase } test("basic functionality") { + basicTest(decommissioning = false) + } + + test("basic decommissioning") { + basicTest(decommissioning = true) + } + + def basicTest(decommissioning: Boolean): Unit = { // Test that adding batch processing time info to allocation manager // causes executors to be requested and killed accordingly + conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning) // There is 1 receiver, and exec 1 has been allocated to it - withAllocationManager(numReceivers = 1) { case (receiverTracker, allocationManager) => + withAllocationManager(numReceivers = 1, conf = conf) { + case (receiverTracker, allocationManager) => + when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1"))) /** Add data point for batch processing time and verify executor allocation */ @@ -83,12 +96,27 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase Map.empty)} } - /** Verify that a particular executor was killed */ + /** Verify that particular executors was killed */ def verifyKilledExec(expectedKilledExec: Option[String]): Unit = { if (expectedKilledExec.nonEmpty) { - verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + if (decommissioning) { + val expectedWithInfo = expectedKilledExec.map{ exec => + (exec, ExecutorDecommissionInfo("spark scale down", false)) + } + verify(allocationClient, times(1)).decommissionExecutors( + meq(expectedWithInfo.toSeq), meq(false)) + verify(allocationClient, never).killExecutor(meq(expectedKilledExec.get)) + } else { + verify(allocationClient, times(1)).killExecutor(meq(expectedKilledExec.get)) + verify(allocationClient, never).decommissionExecutor(meq(expectedKilledExec.get)) + } } else { - verify(allocationClient, never).killExecutor(null) + if (decommissioning) { + verify(allocationClient, never).decommissionExecutors(null, false) + verify(allocationClient, never).decommissionExecutor(null) + } else { + verify(allocationClient, never).killExecutor(null) + } } }