Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
Expand All @@ -284,7 +284,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
Expand Down Expand Up @@ -631,7 +631,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
val response = withLock {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
Expand Down Expand Up @@ -730,6 +730,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()

// SPARK-27112: We need to ensure that there is ordering of lock acquisition
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
// the deadlock issue exposed in SPARK-27112
private def withLock[T](fn: => T): T = scheduler.synchronized {
CoarseGrainedSchedulerBackend.this.synchronized { fn }
}

}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down