Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark

import org.apache.spark.scheduler.ExecutorDecommissionInfo
/**
* A client that communicates with the cluster manager to request or kill executors.
* This is currently supported only in YARN mode.
Expand Down Expand Up @@ -81,6 +82,43 @@ private[spark] trait ExecutorAllocationClient {
countFailures: Boolean,
force: Boolean = false): Seq[String]

/**
* Request that the cluster manager decommission the specified executors.
* Default implementation delegates to kill, scheduler must override
* if it supports graceful decommissioning.
*
* @param executors identifiers of executors & decom info.
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been decommissioned.
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def decommissionExecutors(
executors: Seq[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean): Seq[String] = {
killExecutors(executors.map(_._1),
adjustTargetNumExecutors,
countFailures = false)
}


/**
* Request that the cluster manager decommission the specified executor.
* Default implementation delegates to kill, scheduler can override
* if it supports graceful decommissioning.
*
* @param executorId identifiers of executor to decommission
* @param decommissionInfo information about the decommission (reason, host loss)
* @return whether the request is acknowledged by the cluster manager.
*/
def decommissionExecutor(executorId: String,
decommissionInfo: ExecutorDecommissionInfo): Boolean = {
val decommissionedExecutors = decommissionExecutors(
Seq((executorId, decommissionInfo)),
adjustTargetNumExecutors = true)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is adjustTargetNumExecutors defaulting to true here ? This would mean that all schedulers would try to replinish the executor when asked to DecommissionExecutor(...) -- for example by the Master or when an executor gets a SIGPWR.

I think it shouldn't be the default -- it should atleast be configurable. It only makes sense to have adjustTargetNumExecutors=true when called from org.apache.spark.streaming.scheduler.ExecutorAllocationManager#killExecutor (ie when it is truly called from dynamic allocation codepath and we have decided that we want to replinish the executor).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look above there is a configurable call. This matches how killExecutor is implemented down on line 124.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please point me to where is the configurable call ? I don't see a config check in the code paths that call this method.

It's fine for killExecutor to unconditionally adjust the target number of executors because it is only called in the dynamic allocation codepath, but decommissionExecutor would be called from many other codepaths as well (for example when the driver gets a DecommissionExecutor message) -- and thus I think it should just assume that it should replenish the executor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look on line 95 of this file. I think we should match the semantics of killExecutor as much as possible. If there's a place where we don't want it we can use decommissionExecutors

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, Should we rename decommissionExecutor (singular) to decommissionAndKillExecutor to reflect its purpose better ? It would be too easy to confuse it with decommissionExecutors (on line 95 of this file which allows to not replenish the target number of executors).

Do you want to make the change to the callers of decommissionExecutor in this PR and switch them to decommissioExecutors(Seq(executorId), false) instead. The ones I am most concerned about are:

  • The handling of message DecommissionExecutor (both sync and async variants) in CoarseGrainedSchedulerBackend
  • StandaloneSchedulerBackend.executorDecommissioned

In both the above cases, I think we may not always want replenishing. For example, in the standalone case, when the Worker gets a SIGPWR -- do we want to replenish the executors on the remaining workers (ie oversubscribe the remaining workers) ? Similarly when an executor gets a SIGPWR, do we want to put that load on the remaining executors ? I think the answer to both should be NO unless we are doing a dynamic allocation.

Personally I am fine with any choice of naming here as long as the semantics are not silently changed under the cover, as is the case presently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a new function, what are we changing?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutorAllocationClient is a base class of CoarseGrainedSchedulerBackend. We moved decommissionExecutor from the latter class to the former and as such it is not a new function. Since CoarseGrainedSchedulerBackend no longer overrides decommissionExecutor, ExecutorAllocationClient.decommissionExecutor will be called when CoarseGrainedSchedulerBackend gets a DecommissionExecutor message -- and the semantics of that codepath have been changed to unconditionally impose adjustTargetNumExecutors=true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, I'll update the previous calls to decommissioExecutor

decommissionedExecutors.nonEmpty && decommissionedExecutors(0).equals(executorId)
}


/**
* Request that the cluster manager kill every executor on the specified host.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
import org.apache.spark.metrics.source.Source
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
import org.apache.spark.resource.ResourceProfileManager
Expand Down Expand Up @@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
// If dynamic allocation shuffle tracking or worker decommissioning along with
// storage shuffle decommissioning is enabled we have *experimental* support for
// decommissioning without a shuffle service.
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
(conf.get(WORKER_DECOMMISSION_ENABLED) &&
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
Expand Down Expand Up @@ -539,7 +545,9 @@ private[spark] class ExecutorAllocationManager(
// get the running total as we remove or initialize it to the count - pendingRemoval
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
(executorMonitor.executorCountWithResourceProfile(rpId) -
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
executorMonitor.pendingDecommissioningPerResourceProfileId(rpId)
))
if (newExecutorTotal - 1 < minNumExecutors) {
logDebug(s"Not removing idle executor $executorIdToBeRemoved because there " +
s"are only $newExecutorTotal executor(s) left (minimum number of executor limit " +
Expand All @@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
id => (id, ExecutorDecommissionInfo("spark scale down", false)))
client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false)
} else {
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
}

// [SPARK-21834] killExecutors api reduces the target number of executors.
Expand All @@ -578,7 +592,11 @@ private[spark] class ExecutorAllocationManager(

// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
executorMonitor.executorsDecommissioned(executorsRemoved)
} else {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
}
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
executorsRemoved.toSeq
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ private[deploy] object DeployMessages {

case object ReregisterWithMaster // used when a worker attempts to reconnect to a master

case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future.

// AppClient to Master

case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)

case DecommissionSelf =>
case WorkerDecommission(_, _) =>
decommissionSelf()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(

private[this] val stopping = new AtomicBoolean(false)
var executor: Executor = null
@volatile private var decommissioned = false
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
Expand All @@ -80,6 +79,9 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]

// Track our decommissioning status internally.
@volatile private var decommissioned = false

override def onStart(): Unit = {
logInfo("Registering PWR handler.")
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
Expand Down Expand Up @@ -214,6 +216,10 @@ private[spark] class CoarseGrainedExecutorBackend(
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)

case DecommissionSelf =>
logInfo("Received decommission self")
decommissionSelf()
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -277,12 +283,57 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
logInfo("Done decommissioning self.")
// Shutdown the executor once all tasks are gone & any configured migrations completed.
// Detecting migrations completion doesn't need to be perfect and we want to minimize the
// overhead for executors that are not in decommissioning state as overall that will be
// more of the executors. For example, this will not catch a block which is already in
// the process of being put from a remote executor before migration starts. This trade-off
// is viewed as acceptable to minimize introduction of any new locking structures in critical
// code paths.

val shutdownExec = ThreadUtils.newDaemonSingleThreadExecutor("wait for decommissioning")
val shutdownRunnable = new Runnable() {
override def run(): Unit = {
var lastTaskRunningTime = System.nanoTime()
val sleep_time = 1000 // 1s

while (true) {
logInfo("Checking to see if we can shutdown.")
if (executor == null || executor.numRunningTasks == 0) {
if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
logInfo("No running tasks, checking migrations")
val allBlocksMigrated = env.blockManager.lastMigrationInfo()
// We can only trust allBlocksMigrated boolean value if there were no tasks running
// since the start of computing it.
if (allBlocksMigrated._2 &&
(allBlocksMigrated._1 > lastTaskRunningTime)) {
logInfo("No running tasks, all blocks migrated, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
} else {
logInfo("All blocks not yet migrated.")
}
} else {
logInfo("No running tasks, no block migration configured, stopping.")
exitExecutor(0, "Finished decommissioning", notifyDriver = true)
}
Thread.sleep(sleep_time)
} else {
logInfo("Blocked from shutdown by running task")
// If there is a running task it could store blocks, so make sure we wait for a
// migration loop to complete after the last task is done.
Thread.sleep(sleep_time)
lastTaskRunningTime = System.nanoTime()
}
}
}
}
shutdownExec.submit(shutdownRunnable)
logInfo("Will exit when finished decommissioning")
// Return true since we are handling a signal
true
} catch {
case e: Exception =>
logError(s"Error ${e} during attempt to decommission self")
logError("Unexpected error while decommissioning self", e)
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {

// The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not.
case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage

// Used to ask an executor to decommission it's self.
case object DecommissionSelf extends CoarseGrainedClusterMessage
}
Loading