-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16533][CORE] resolve deadlocking in driver when executors die #14710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cef69bf
4970b3b
920274a
380291b
adb2969
3eb34fd
5a2f30f
0772e81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger | |
| import javax.annotation.concurrent.GuardedBy | ||
|
|
||
| import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} | ||
| import scala.concurrent.Future | ||
| import scala.concurrent.duration.Duration | ||
|
|
||
| import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} | ||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -49,6 +51,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| protected val totalRegisteredExecutors = new AtomicInteger(0) | ||
| protected val conf = scheduler.sc.conf | ||
| private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) | ||
| private val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) | ||
| // Submit tasks only after (registered resources / total expected resources) | ||
| // is equal to at least this value, that is double between 0 and 1. | ||
| private val _minRegisteredRatio = | ||
|
|
@@ -272,6 +275,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
|
|
||
| // Remove a disconnected slave from the cluster | ||
| private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { | ||
| logDebug(s"Asked to remove executor $executorId with reason $reason") | ||
| executorDataMap.get(executorId) match { | ||
| case Some(executorInfo) => | ||
| // This must be synchronized because variables mutated | ||
|
|
@@ -446,19 +450,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| * Request an additional number of executors from the cluster manager. | ||
| * @return whether the request is acknowledged. | ||
| */ | ||
| final override def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized { | ||
| final override def requestExecutors(numAdditionalExecutors: Int): Boolean = { | ||
| if (numAdditionalExecutors < 0) { | ||
| throw new IllegalArgumentException( | ||
| "Attempted to request a negative number of additional executor(s) " + | ||
| s"$numAdditionalExecutors from the cluster manager. Please specify a positive number!") | ||
| } | ||
| logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") | ||
| logDebug(s"Number of pending executors is now $numPendingExecutors") | ||
|
|
||
| numPendingExecutors += numAdditionalExecutors | ||
| // Account for executors pending to be added or removed | ||
| val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size | ||
| doRequestTotalExecutors(newTotal) | ||
| val response = synchronized { | ||
| numPendingExecutors += numAdditionalExecutors | ||
| logDebug(s"Number of pending executors is now $numPendingExecutors") | ||
|
|
||
| // Account for executors pending to be added or removed | ||
| doRequestTotalExecutors( | ||
| numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) | ||
| } | ||
|
|
||
| defaultAskTimeout.awaitResult(response) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -479,19 +488,24 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| numExecutors: Int, | ||
| localityAwareTasks: Int, | ||
| hostToLocalTaskCount: Map[String, Int] | ||
| ): Boolean = synchronized { | ||
| ): Boolean = { | ||
| if (numExecutors < 0) { | ||
| throw new IllegalArgumentException( | ||
| "Attempted to request a negative number of executor(s) " + | ||
| s"$numExecutors from the cluster manager. Please specify a positive number!") | ||
| } | ||
|
|
||
| this.localityAwareTasks = localityAwareTasks | ||
| this.hostToLocalTaskCount = hostToLocalTaskCount | ||
| val response = synchronized { | ||
| this.localityAwareTasks = localityAwareTasks | ||
| this.hostToLocalTaskCount = hostToLocalTaskCount | ||
|
|
||
| numPendingExecutors = | ||
| math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0) | ||
|
|
||
| numPendingExecutors = | ||
| math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0) | ||
| doRequestTotalExecutors(numExecutors) | ||
| doRequestTotalExecutors(numExecutors) | ||
| } | ||
|
|
||
| defaultAskTimeout.awaitResult(response) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -504,16 +518,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| * insufficient resources to satisfy the first request. We make the assumption here that the | ||
| * cluster manager will eventually fulfill all requests when resources free up. | ||
| * | ||
| * @return whether the request is acknowledged. | ||
| * @return a future whose evaluation indicates whether the request is acknowledged. | ||
| */ | ||
| protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false | ||
| protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = | ||
| Future.successful(false) | ||
|
|
||
| /** | ||
| * Request that the cluster manager kill the specified executors. | ||
| * @return whether the kill request is acknowledged. If list to kill is empty, it will return | ||
| * false. | ||
| */ | ||
| final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized { | ||
| final override def killExecutors(executorIds: Seq[String]): Boolean = { | ||
| killExecutors(executorIds, replace = false, force = false) | ||
| } | ||
|
|
||
|
|
@@ -533,39 +548,53 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| final def killExecutors( | ||
| executorIds: Seq[String], | ||
| replace: Boolean, | ||
| force: Boolean): Boolean = synchronized { | ||
| force: Boolean): Boolean = { | ||
| logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") | ||
| val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) | ||
| unknownExecutors.foreach { id => | ||
| logWarning(s"Executor to kill $id does not exist!") | ||
| } | ||
|
|
||
| // If an executor is already pending to be removed, do not kill it again (SPARK-9795) | ||
| // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) | ||
| val executorsToKill = knownExecutors | ||
| .filter { id => !executorsPendingToRemove.contains(id) } | ||
| .filter { id => force || !scheduler.isExecutorBusy(id) } | ||
| executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } | ||
|
|
||
| // If we do not wish to replace the executors we kill, sync the target number of executors | ||
| // with the cluster manager to avoid allocating new ones. When computing the new target, | ||
| // take into account executors that are pending to be added or removed. | ||
| if (!replace) { | ||
| doRequestTotalExecutors( | ||
| numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) | ||
| } else { | ||
| numPendingExecutors += knownExecutors.size | ||
| val response = synchronized { | ||
| val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) | ||
| unknownExecutors.foreach { id => | ||
| logWarning(s"Executor to kill $id does not exist!") | ||
| } | ||
|
|
||
| // If an executor is already pending to be removed, do not kill it again (SPARK-9795) | ||
| // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) | ||
| val executorsToKill = knownExecutors | ||
| .filter { id => !executorsPendingToRemove.contains(id) } | ||
| .filter { id => force || !scheduler.isExecutorBusy(id) } | ||
| executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } | ||
|
|
||
| // If we do not wish to replace the executors we kill, sync the target number of executors | ||
| // with the cluster manager to avoid allocating new ones. When computing the new target, | ||
| // take into account executors that are pending to be added or removed. | ||
| val adjustTotalExecutors = | ||
| if (!replace) { | ||
| doRequestTotalExecutors( | ||
| numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) | ||
| } else { | ||
| numPendingExecutors += knownExecutors.size | ||
| Future.successful(true) | ||
| } | ||
|
|
||
| val killExecutors: Boolean => Future[Boolean] = | ||
| if (!executorsToKill.isEmpty) { | ||
| _ => doKillExecutors(executorsToKill) | ||
| } else { | ||
| _ => Future.successful(false) | ||
| } | ||
|
|
||
| adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please correct me if I'm wrong as I'm not that familiar with the future flatmap, but isn't this going to run the doRequestTotalExecutors, then once that comes back, apply the result to killExecutors? Which I think means the killExecutors is called outside of the synchronize block after we do the awaitResults for the doRequestTotalExecutors?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure you're correct, but at the same time I don't think there's a requirement that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I originally started working on this I thought I wouldn't be able to avoid blocking on that call within the synchronized block. However my (admittedly novice) understanding of the code aligns with what @vanzin said - because all it does is send the kill message there's no need to synchronize over it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I was mostly just trying to make sure I understood correctly. I'm not worried about the rpc call outside of the synchronize block because as you say its best if it is done outside since its safe to call it multi-threaded. It was more to make sure other datastructures weren't modified outside synchronize block. In this case all its accessing is the local executorsToKill so doesn't matter. |
||
| } | ||
|
|
||
| !executorsToKill.isEmpty && doKillExecutors(executorsToKill) | ||
| defaultAskTimeout.awaitResult(response) | ||
| } | ||
|
|
||
| /** | ||
| * Kill the given list of executors through the cluster manager. | ||
| * @return whether the kill request is acknowledged. | ||
| */ | ||
| protected def doKillExecutors(executorIds: Seq[String]): Boolean = false | ||
|
|
||
| protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = | ||
| Future.successful(false) | ||
| } | ||
|
|
||
| private[spark] object CoarseGrainedSchedulerBackend { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look at this more tomorrow, but what happens if the ask does fail and we have now incremented numPendingExecutors? that issue was there before, but now if we are doing ask instead of askwithretry it might show up more often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a longer discussion (and something I'd like to address thoroughly at some point when I find time), but
askWithRetryis actually pretty useless with the new RPC implementation, and I'd say even harmful. Anaskwith a larger timeout has a much better chance of succeeding, and is cheaper thanaskWithRetry.So I don't think that the change makes the particular situation you point out more common at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'll have to go look at the new implementation, can you clarify why ask would be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note I would still like to know what happens if it occurs as it could have just been a bug before. If its harmless then I'm ok with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this particular case, it's not that
askwould be better, it's just that it would be no worse. With the new RPC code, the only timeaskWithRetrywill actually retry, barring bugs in the RPC handlers, is when a timeout occurs, since the RPC layer does not drop messages. So anaskwith a longer timeout has actually a better chance of succeeding, since withaskWithRetrythe remote end will receive and process the first message before the retries, even if the sender has given up on it.As for the bug you mention, yes it exists, but it also existed before.