diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala index 63d87b4cd385c..52710b0bf7201 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala @@ -59,15 +59,18 @@ private[spark] trait ExecutorAllocationClient { * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down * after these executors have been killed * @param countFailures if there are tasks running on the executors when they are killed, whether - * to count those failures toward task failure limits + * to count those failures toward task failure limits * @param force whether to force kill busy executors, default false + * @param blacklistingOnTaskCompletion whether the executors are being killed due to + * blacklisting triggered by the task completion event * @return the ids of the executors acknowledged by the cluster manager to be removed. */ def killExecutors( executorIds: Seq[String], adjustTargetNumExecutors: Boolean, countFailures: Boolean, - force: Boolean = false): Seq[String] + force: Boolean = false, + blacklistingOnTaskCompletion: Boolean = false): Seq[String] /** * Request that the cluster manager kill every executor on the specified host. diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index 9e524c52267be..27225313bf043 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -146,12 +146,15 @@ private[scheduler] class BlacklistTracker ( nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry) } - private def killExecutor(exec: String, msg: String): Unit = { + private def killExecutor( + exec: String, + msg: String, + blacklistingOnTaskCompletion: Boolean = false): Unit = { allocationClient match { case Some(a) => logInfo(msg) a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false, - force = true) + force = true, blacklistingOnTaskCompletion = blacklistingOnTaskCompletion) case None => logInfo(s"Not attempting to kill blacklisted executor id $exec " + s"since allocation client is not defined.") @@ -161,7 +164,8 @@ private[scheduler] class BlacklistTracker ( private def killBlacklistedExecutor(exec: String): Unit = { if (conf.get(config.BLACKLIST_KILL_ENABLED)) { killExecutor(exec, - s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.") + s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key}" + + s" is set.", blacklistingOnTaskCompletion = true) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dc0f21c5e91d1..898413c8b1841 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -111,6 +111,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") + // SPARK-27112: This lock is explicitly added here to keep the changes introduced by SPARK-19757 + // and at the same time, reverting the code which held the lock to CoarseGrainedSchedulerBackend + // so as to fix the deadlock issue exposed in SPARK-27112 + private val makeOffersLock: Object = new Object + class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging { override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv @@ -257,15 +262,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on all executors private def makeOffers() { + var workOffers: IndexedSeq[WorkerOffer] = null // Make sure no executor is killed while some task is launching on it - val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { - // Filter out executors under killing - val activeExecutors = executorDataMap.filterKeys(executorIsAlive) - val workOffers = activeExecutors.map { - case (id, executorData) => - new WorkerOffer(id, executorData.executorHost, executorData.freeCores, - Some(executorData.executorAddress.hostPort)) - }.toIndexedSeq + val taskDescs = makeOffersLock.synchronized { + CoarseGrainedSchedulerBackend.this.synchronized { + // Filter out executors under killing + val activeExecutors = executorDataMap.filterKeys(executorIsAlive) + workOffers = activeExecutors.map { + case (id, executorData) => + new WorkerOffer(id, executorData.executorHost, executorData.freeCores, + Some(executorData.executorAddress.hostPort)) + }.toIndexedSeq + } scheduler.resourceOffers(workOffers) } if (!taskDescs.isEmpty) { @@ -283,14 +291,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make fake resource offers on just one executor private def makeOffers(executorId: String) { + var workOffers: IndexedSeq[WorkerOffer] = null // Make sure no executor is killed while some task is launching on it - val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { + val taskDescs = makeOffersLock.synchronized { // Filter out executors under killing if (executorIsAlive(executorId)) { - val executorData = executorDataMap(executorId) - val workOffers = IndexedSeq( - new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, - Some(executorData.executorAddress.hostPort))) + CoarseGrainedSchedulerBackend.this.synchronized { + val executorData = executorDataMap(executorId) + workOffers = IndexedSeq( + new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores, + Some(executorData.executorAddress.hostPort))) + } scheduler.resourceOffers(workOffers) } else { Seq.empty @@ -622,67 +633,106 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * @param countFailures if there are tasks running on the executors when they are killed, whether * those failures be counted to task failure limits? * @param force whether to force kill busy executors, default false + * @param blacklistingOnTaskCompletion whether the executors are being killed due to + * blacklisting triggered by the task completion event * @return the ids of the executors acknowledged by the cluster manager to be removed. */ final override def killExecutors( executorIds: Seq[String], adjustTargetNumExecutors: Boolean, countFailures: Boolean, - force: Boolean): Seq[String] = { + force: Boolean, + blacklistingOnTaskCompletion: Boolean): Seq[String] = { logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") - val response = synchronized { - val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) - unknownExecutors.foreach { id => - logWarning(s"Executor to kill $id does not exist!") - } - - // If an executor is already pending to be removed, do not kill it again (SPARK-9795) - // If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552) - val executorsToKill = knownExecutors - .filter { id => !executorsPendingToRemove.contains(id) } - .filter { id => force || !scheduler.isExecutorBusy(id) } - executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures } - - logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") - - // If we do not wish to replace the executors we kill, sync the target number of executors - // with the cluster manager to avoid allocating new ones. When computing the new target, - // take into account executors that are pending to be added or removed. - val adjustTotalExecutors = - if (adjustTargetNumExecutors) { - requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0) - if (requestedTotalExecutors != - (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) { - logDebug( - s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force): - |Executor counts do not match: - |requestedTotalExecutors = $requestedTotalExecutors - |numExistingExecutors = $numExistingExecutors - |numPendingExecutors = $numPendingExecutors - |executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin) - } - doRequestTotalExecutors(requestedTotalExecutors) - } else { - numPendingExecutors += executorsToKill.size - Future.successful(true) + var response: Future[Seq[String]] = null + val idleExecutorIds = executorIds.filter { id => force || !scheduler.isExecutorBusy(id) } + if (!blacklistingOnTaskCompletion) { + /** + * The flag blacklistingOnTaskCompletion ensures that this code path is not followed by + * the task-result-getter thread so as to avoid the deadlock scenario between + * task-result-getter thread and dispatcher-event-loop thread. + */ + makeOffersLock.synchronized { + response = synchronized { + killExecutorsImpl(idleExecutorIds, adjustTargetNumExecutors, countFailures, force) } + } + } else { + response = synchronized { + killExecutorsImpl(idleExecutorIds, adjustTargetNumExecutors, countFailures, force) + } + } - val killExecutors: Boolean => Future[Boolean] = - if (!executorsToKill.isEmpty) { - _ => doKillExecutors(executorsToKill) - } else { - _ => Future.successful(false) - } + defaultAskTimeout.awaitResult(response) + } - val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + /** + * Request that the cluster manager kill the specified executors. + * + * @param executorIds identifiers of executors to kill + * @param adjustTargetNumExecutors whether the target number of executors be adjusted down + * after these executors have been killed + * @param countFailures if there are tasks running on the executors when they are killed, whether + * those failures be counted to task failure limits? + * @param force whether to force kill busy executors, default false + * @return a Future that will resolve to the ids of the executors acknowledged by the cluster + * manager to be removed. + */ + private def killExecutorsImpl( + executorIds: Seq[String], + adjustTargetNumExecutors: Boolean, + countFailures: Boolean, + force: Boolean): Future[Seq[String]] = { + + val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains) + unknownExecutors.foreach { id => + logWarning(s"Executor to kill $id does not exist!") + } - killResponse.flatMap(killSuccessful => - Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String]) - )(ThreadUtils.sameThread) + // If an executor is already pending to be removed, do not kill it again (SPARK-9795) + // If this executor is busy, do not kill it unless we are told to + // force kill it (SPARK-9552) + val executorsToKill = knownExecutors + .filter { id => !executorsPendingToRemove.contains(id) } + executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures } + + logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") + + // If we do not wish to replace the executors we kill, sync the target number of executors + // with the cluster manager to avoid allocating new ones. When computing the new target, + // take into account executors that are pending to be added or removed. + val adjustTotalExecutors = + if (adjustTargetNumExecutors) { + requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0) + if (requestedTotalExecutors != + (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) { + logDebug( + s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force): + |Executor counts do not match: + |requestedTotalExecutors = $requestedTotalExecutors + |numExistingExecutors = $numExistingExecutors + |numPendingExecutors = $numPendingExecutors + |executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin) + } + doRequestTotalExecutors(requestedTotalExecutors) + } else { + numPendingExecutors += executorsToKill.size + Future.successful(true) } - defaultAskTimeout.awaitResult(response) + val killExecutors: Boolean => Future[Boolean] = + if (!executorsToKill.isEmpty) { + _ => doKillExecutors(executorsToKill) + } else { + _ => Future.successful(false) + } + + val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread) + + killResponse.flatMap(killSuccessful => + Future.successful(if (killSuccessful) executorsToKill else Seq.empty[String]) + )(ThreadUtils.sameThread) } /** diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 5500329b13a7c..9432f3f25b308 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -1128,19 +1128,19 @@ class ExecutorAllocationManagerSuite clock.advance(1000) manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis()) assert(numExecutorsTarget(manager) === 1) - verify(mockAllocationClient, never).killExecutors(any(), any(), any(), any()) + verify(mockAllocationClient, never).killExecutors(any(), any(), any(), any(), any()) // now we cross the idle timeout for executor-1, so we kill it. the really important // thing here is that we do *not* ask the executor allocation client to adjust the target // number of executors down - when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, false)) + when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, false, false)) .thenReturn(Seq("executor-1")) clock.advance(3000) schedule(manager) assert(maxNumExecutorsNeeded(manager) === 1) assert(numExecutorsTarget(manager) === 1) // here's the important verify -- we did kill the executors, but did not adjust the target count - verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false) + verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false, false) } test("SPARK-26758 check executor target number after idle time out ") { @@ -1382,7 +1382,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend executorIds: Seq[String], adjustTargetNumExecutors: Boolean, countFailures: Boolean, - force: Boolean): Seq[String] = executorIds + force: Boolean, + blacklistingOnTaskCompletion: Boolean): Seq[String] = executorIds override def start(): Unit = sb.start() diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index 0adfb077964e6..35174d6e7c46b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -479,7 +479,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { val allocationClientMock = mock[ExecutorAllocationClient] - when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called")) + when(allocationClientMock.killExecutors( + any(), any(), any(), any(), any())).thenReturn(Seq("called")) when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] { // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist // is updated before we ask the executor allocation client to kill all the executors @@ -517,7 +518,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures) - verify(allocationClientMock, never).killExecutors(any(), any(), any(), any()) + verify(allocationClientMock, never).killExecutors(any(), any(), any(), any(), any()) verify(allocationClientMock, never).killExecutorsOnHost(any()) // Enable auto-kill. Blacklist an executor and make sure killExecutors is called. @@ -533,7 +534,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures) - verify(allocationClientMock).killExecutors(Seq("1"), false, false, true) + verify(allocationClientMock).killExecutors(Seq("1"), false, false, true, true) val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1) // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole @@ -545,13 +546,14 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M } blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures) - verify(allocationClientMock).killExecutors(Seq("2"), false, false, true) + verify(allocationClientMock).killExecutors(Seq("2"), false, false, true, true) verify(allocationClientMock).killExecutorsOnHost("hostA") } test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") { val allocationClientMock = mock[ExecutorAllocationClient] - when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called")) + when(allocationClientMock.killExecutors( + any(), any(), any(), any(), any())).thenReturn(Seq("called")) when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] { // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist // is updated before we ask the executor allocation client to kill all the executors @@ -571,7 +573,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M conf.set(config.BLACKLIST_KILL_ENABLED, false) blacklist.updateBlacklistForFetchFailure("hostA", exec = "1") - verify(allocationClientMock, never).killExecutors(any(), any(), any(), any()) + verify(allocationClientMock, never).killExecutors(any(), any(), any(), any(), any()) verify(allocationClientMock, never).killExecutorsOnHost(any()) assert(blacklist.nodeToBlacklistedExecs.contains("hostA")) @@ -583,7 +585,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M clock.advance(1000) blacklist.updateBlacklistForFetchFailure("hostA", exec = "1") - verify(allocationClientMock).killExecutors(Seq("1"), false, false, true) + verify(allocationClientMock).killExecutors(Seq("1"), false, false, true, true) verify(allocationClientMock, never).killExecutorsOnHost(any()) assert(blacklist.executorIdToBlacklistStatus.contains("1"))