Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ private[spark] trait ExecutorAllocationClient {
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been killed
* @param countFailures if there are tasks running on the executors when they are killed, whether
* to count those failures toward task failure limits
* to count those failures toward task failure limits
* @param force whether to force kill busy executors, default false
* @param blacklistingOnTaskCompletion whether the executors are being killed due to
* blacklisting triggered by the task completion event
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def killExecutors(
executorIds: Seq[String],
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean = false): Seq[String]
force: Boolean = false,
blacklistingOnTaskCompletion: Boolean = false): Seq[String]

/**
* Request that the cluster manager kill every executor on the specified host.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killExecutor(exec: String, msg: String): Unit = {
private def killExecutor(
exec: String,
msg: String,
blacklistingOnTaskCompletion: Boolean = false): Unit = {
allocationClient match {
case Some(a) =>
logInfo(msg)
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
force = true, blacklistingOnTaskCompletion = blacklistingOnTaskCompletion)
case None =>
logInfo(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
Expand All @@ -161,7 +164,8 @@ private[scheduler] class BlacklistTracker (
private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
killExecutor(exec,
s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
s"Killing blacklisted executor id $exec since ${config.BLACKLIST_KILL_ENABLED.key}" +
s" is set.", blacklistingOnTaskCompletion = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

// SPARK-27112: This lock is explicitly added here to keep the changes introduced by SPARK-19757
// and at the same time, reverting the code which held the lock to CoarseGrainedSchedulerBackend
// so as to fix the deadlock issue exposed in SPARK-27112
private val makeOffersLock: Object = new Object

class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging {

override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
Expand Down Expand Up @@ -257,15 +262,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Make fake resource offers on all executors
private def makeOffers() {
var workOffers: IndexedSeq[WorkerOffer] = null
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
val taskDescs = makeOffersLock.synchronized {
CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort))
}.toIndexedSeq
}
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
Expand All @@ -283,14 +291,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
var workOffers: IndexedSeq[WorkerOffer] = null
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = makeOffersLock.synchronized {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort)))
CoarseGrainedSchedulerBackend.this.synchronized {
val executorData = executorDataMap(executorId)
workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort)))
}
scheduler.resourceOffers(workOffers)
} else {
Seq.empty
Expand Down Expand Up @@ -622,67 +633,106 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @param countFailures if there are tasks running on the executors when they are killed, whether
* those failures be counted to task failure limits?
* @param force whether to force kill busy executors, default false
* @param blacklistingOnTaskCompletion whether the executors are being killed due to
* blacklisting triggered by the task completion event
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
final override def killExecutors(
executorIds: Seq[String],
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean): Seq[String] = {
force: Boolean,
blacklistingOnTaskCompletion: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}

// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }

logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (adjustTargetNumExecutors) {
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force):
|Executor counts do not match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}
doRequestTotalExecutors(requestedTotalExecutors)
} else {
numPendingExecutors += executorsToKill.size
Future.successful(true)
var response: Future[Seq[String]] = null
val idleExecutorIds = executorIds.filter { id => force || !scheduler.isExecutorBusy(id) }
if (!blacklistingOnTaskCompletion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the performance cost to always use makeOffersLock (and deplementing the flag blacklistingOnTaskCompletion)? As this code is already quite complex and with the boolean flag dependent locking I think it will be even harder to follow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the code fix is a little tricky here, however, as far as I have tested, I have not seen a performance degradation in the job running time by addition of the extra lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there must be some misunderstanding here.
My suggestion is removing this if condition completely and using always:

makeOffersLock.synchronized {
  response = synchronized {
    killExecutorsImpl(idleExecutorIds, adjustTargetNumExecutors, countFailures, force)
  }
}

And as you got rid of the if you can remove the blacklistingOnTaskCompletion from the methods's arguments as well.

As the order of locking always starts makeOffersLock I think this should be enough to avoid the deadlock.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag blacklistingOnTaskCompletion is needed to ensure that the thread "task-result-getter-x" should not try to acquire the lock on makeOffersLock which is a necessary condition to avoid the deadlock between "task-result-getter" thread and "dispatcher-event-loop" thread.

The reason is that when "task-result-getter" thread reaches the method killExecutors(), it has already acquired the lock on TaskSchedulerImpl and will try to acquire makeOffersLock. The "dispatcher-event-loop" thread on the other hand, acquires makeOffersLock and will wait on acquiring TaskSchedulerImpl lock in the method resourceOffers(), thus leading to the deadlock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I see. I checked the first deadlock and I think the problem is in org.apache.spark.scheduler.TaskSchedulerImpl#isExecutorBusy:

def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}

That synchronised is too restrictive here for reading a snapshot state of the executorIdToRunningTaskIds map. For this problem a solution could be just using TrieMap, which is "A concurrent hash-trie or TrieMap is a concurrent thread-safe lock-free implementation of a hash array mapped trie".

If you change the type of executorIdToRunningTaskIds from HashMap to TrieMap then you can remove the synchronised from isExecutorBusy.

I have checked and the isExecutorBusy is only used from two places:

  • org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers where we already in a synchronised block, so with the type change the behaviour is the same as before
  • org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#killExecutors where we already lived with a snapshot state which could be outdated after the method call

Regarding the second deadlock I will continue my analyses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just focused on saving the extra lock first.

But we could keep track of the executor IDs where tasks are scheduled/running separately in a concurrently accessable set (volatile reference for an Immutable Set or CopyOnWriteArraySet).

The method isExecutorBusy could use this new set. So we can keep HashMap for executorIdToRunningTaskIds and still we are not introducing that lock.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that makeOffersLock solves the deadlock here. You wont' get a deadlock between the same two locks, but now it can be with makeOffersLock instead. Consider this sequence (some simplification of full call stack, but showing the important locks at least)

  1. taskresultgetter: handleFailedTask --> lock on taskSchedulerImpl

  2. taskresultgetter: BlacklistTracker.killExecutor

  3. dispatcher: receive --> lock on CoarseGrainedSchedulerBackendkk

  4. dispatcher: makeOffers --> lock on makeOffersLock

  5. dispatcher: blocked on TaskSchedulerImpl lock

  6. taskResultGetter: makeOffers, but blocked on makeOffersLock

As Attila suggested, I would consider creating an ordering between the TaskSchedulerImpl lock and the CoarseGrainedSchedulerBackend lock, so that we always get the TaskSchedulerImpl lock first. Of course that comes with a performance penalty, and we will have to audit all other uses of the CoarseGrainedSchedulerBackend lock too.

Still thinking about any other options ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito I agree with you and @attilapiros about creating an ordering. I shall definitely follow the approach and try it out.

Regarding your comment on the deadlock between makeOffersLock and task-result-getter thread, that should ideally not happen as the task-result-getter thread will never compete for acquiring makeOffersLock. The reason I have added the flag blacklistingForTaskCompletion is to ensure that task-result-getter thread never acquires lock on makeOffersLock.

Also, you are right in saying that makeOffersLock does not solve the deadlock. I have explained the purpose of makeOffersLock in my comment below. Quoting it here:

I can explain more about the makeOffersLock here.

PR #17091 introduced the part about acquiring synchronization on CoarseGrainedSchedulerBackend object in the method makeOffers(). This particular piece of code introduced a deadlock between task-result-getter thread and dispatcher-event-loop thread. I can simply removed the synchronized statement in makeOffers() and the deadlock would be resolved and we really do not need makeOffersLock.

However, removing the synchronized statement will once again expose the race condition described in JIRA https://issues.apache.org/jira/browse/SPARK-19757 for which the fix in the corresponding PR was merged. makeOffersLock here serves as the solution to the above problem. By synchronizing on makeOffersLock, the race condition between dynamic-executor-allocation thread and dispatcher-event-loop thread is avoided. That is indeed it's sole purpose. I am however, open to discussing and working on better solutions to the above problem, if any. Thank you.

Copy link
Author

@pgandhi999 pgandhi999 Mar 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am basically trying to solve the two deadlocks and also fix the race condition issue for SPARK-19757. I think the approach of ordering alongwith using a concurrently accessible separate Set as suggested by @attilapiros and @squito should work out. Let me work on that and get back to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see, sorry I misread part of the logic, thanks

/**
* The flag blacklistingOnTaskCompletion ensures that this code path is not followed by
* the task-result-getter thread so as to avoid the deadlock scenario between
* task-result-getter thread and dispatcher-event-loop thread.
*/
makeOffersLock.synchronized {
response = synchronized {
killExecutorsImpl(idleExecutorIds, adjustTargetNumExecutors, countFailures, force)
}
}
} else {
response = synchronized {
killExecutorsImpl(idleExecutorIds, adjustTargetNumExecutors, countFailures, force)
}
}

val killExecutors: Boolean => Future[Boolean] =
if (!executorsToKill.isEmpty) {
_ => doKillExecutors(executorsToKill)
} else {
_ => Future.successful(false)
}
defaultAskTimeout.awaitResult(response)
}

val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)
/**
* Request that the cluster manager kill the specified executors.
*
* @param executorIds identifiers of executors to kill
* @param adjustTargetNumExecutors whether the target number of executors be adjusted down
* after these executors have been killed
* @param countFailures if there are tasks running on the executors when they are killed, whether
* those failures be counted to task failure limits?
* @param force whether to force kill busy executors, default false
* @return a Future that will resolve to the ids of the executors acknowledged by the cluster
* manager to be removed.
*/
private def killExecutorsImpl(
executorIds: Seq[String],
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean): Future[Seq[String]] = {

val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}

killResponse.flatMap(killSuccessful =>
Future.successful (if (killSuccessful) executorsToKill else Seq.empty[String])
)(ThreadUtils.sameThread)
// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
// If this executor is busy, do not kill it unless we are told to
// force kill it (SPARK-9552)
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }

logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (adjustTargetNumExecutors) {
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force):
|Executor counts do not match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}
doRequestTotalExecutors(requestedTotalExecutors)
} else {
numPendingExecutors += executorsToKill.size
Future.successful(true)
}

defaultAskTimeout.awaitResult(response)
val killExecutors: Boolean => Future[Boolean] =
if (!executorsToKill.isEmpty) {
_ => doKillExecutors(executorsToKill)
} else {
_ => Future.successful(false)
}

val killResponse = adjustTotalExecutors.flatMap(killExecutors)(ThreadUtils.sameThread)

killResponse.flatMap(killSuccessful =>
Future.successful(if (killSuccessful) executorsToKill else Seq.empty[String])
)(ThreadUtils.sameThread)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,19 +1128,19 @@ class ExecutorAllocationManagerSuite
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
assert(numExecutorsTarget(manager) === 1)
verify(mockAllocationClient, never).killExecutors(any(), any(), any(), any())
verify(mockAllocationClient, never).killExecutors(any(), any(), any(), any(), any())

// now we cross the idle timeout for executor-1, so we kill it. the really important
// thing here is that we do *not* ask the executor allocation client to adjust the target
// number of executors down
when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, false))
when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, false, false))
.thenReturn(Seq("executor-1"))
clock.advance(3000)
schedule(manager)
assert(maxNumExecutorsNeeded(manager) === 1)
assert(numExecutorsTarget(manager) === 1)
// here's the important verify -- we did kill the executors, but did not adjust the target count
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false, false)
}

test("SPARK-26758 check executor target number after idle time out ") {
Expand Down Expand Up @@ -1382,7 +1382,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
executorIds: Seq[String],
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean): Seq[String] = executorIds
force: Boolean,
blacklistingOnTaskCompletion: Boolean): Seq[String] = executorIds

override def start(): Unit = sb.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M

test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
when(allocationClientMock.killExecutors(
any(), any(), any(), any(), any())).thenReturn(Seq("called"))
when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
Expand Down Expand Up @@ -517,7 +518,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)

verify(allocationClientMock, never).killExecutors(any(), any(), any(), any())
verify(allocationClientMock, never).killExecutors(any(), any(), any(), any(), any())
verify(allocationClientMock, never).killExecutorsOnHost(any())

// Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
Expand All @@ -533,7 +534,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)

verify(allocationClientMock).killExecutors(Seq("1"), false, false, true)
verify(allocationClientMock).killExecutors(Seq("1"), false, false, true, true)

val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1)
// Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
Expand All @@ -545,13 +546,14 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
}
blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)

verify(allocationClientMock).killExecutors(Seq("2"), false, false, true)
verify(allocationClientMock).killExecutors(Seq("2"), false, false, true, true)
verify(allocationClientMock).killExecutorsOnHost("hostA")
}

test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
when(allocationClientMock.killExecutors(
any(), any(), any(), any(), any())).thenReturn(Seq("called"))
when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
Expand All @@ -571,7 +573,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
conf.set(config.BLACKLIST_KILL_ENABLED, false)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")

verify(allocationClientMock, never).killExecutors(any(), any(), any(), any())
verify(allocationClientMock, never).killExecutors(any(), any(), any(), any(), any())
verify(allocationClientMock, never).killExecutorsOnHost(any())

assert(blacklist.nodeToBlacklistedExecs.contains("hostA"))
Expand All @@ -583,7 +585,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
clock.advance(1000)
blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")

verify(allocationClientMock).killExecutors(Seq("1"), false, false, true)
verify(allocationClientMock).killExecutors(Seq("1"), false, false, true, true)
verify(allocationClientMock, never).killExecutorsOnHost(any())

assert(blacklist.executorIdToBlacklistStatus.contains("1"))
Expand Down