Skip to content

Commit 3130ac9

Browse files
fred-dbdongjoon-hyun
authored andcommitted
[SPARK-46861][CORE] Avoid Deadlock in DAGScheduler
* The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time. * To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD. * When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache. * If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation. * To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache. * This is a deadlock you can run into, which can prevent any progress on the cluster. * No * Unit test that reproduces the issue. No Closes #44882 from fred-db/fix-deadlock. Authored-by: fred-db <fredrik.klauss@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 617014c) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent e56bd97 commit 3130ac9

File tree

3 files changed

+62
-18
lines changed

3 files changed

+62
-18
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,17 @@ abstract class RDD[T: ClassTag](
223223
* not use `this` because RDDs are user-visible, so users might have added their own locking on
224224
* RDDs; sharing that could lead to a deadlock.
225225
*
226-
* One thread might hold the lock on many of these, for a chain of RDD dependencies; but
227-
* because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no
228-
* chance of deadlock.
226+
* One thread might hold the lock on many of these, for a chain of RDD dependencies. Deadlocks
227+
* are possible if we try to lock another resource while holding the stateLock,
228+
* and the lock acquisition sequence of these locks is not guaranteed to be the same.
229+
* This can lead lead to a deadlock as one thread might first acquire the stateLock,
230+
* and then the resource,
231+
* while another thread might first acquire the resource, and then the stateLock.
229232
*
230233
* Executors may reference the shared fields (though they should never mutate them,
231234
* that only happens on the driver).
232235
*/
233-
private val stateLock = new Serializable {}
236+
private[spark] val stateLock = new Serializable {}
234237

235238
// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
236239
// be overwritten when we're checkpointed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,9 @@ private[spark] class DAGScheduler(
173173
* locations where that RDD partition is cached.
174174
*
175175
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
176+
* If you need to access any RDD while synchronizing on the cache locations,
177+
* first synchronize on the RDD, and then synchronize on this map to avoid deadlocks. The RDD
178+
* could try to access the cache locations after synchronizing on the RDD.
176179
*/
177180
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
178181

@@ -408,22 +411,24 @@ private[spark] class DAGScheduler(
408411
}
409412

410413
private[scheduler]
411-
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
412-
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
413-
if (!cacheLocs.contains(rdd.id)) {
414-
// Note: if the storage level is NONE, we don't need to get locations from block manager.
415-
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
416-
IndexedSeq.fill(rdd.partitions.length)(Nil)
417-
} else {
418-
val blockIds =
419-
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
420-
blockManagerMaster.getLocations(blockIds).map { bms =>
421-
bms.map(bm => TaskLocation(bm.host, bm.executorId))
414+
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized {
415+
cacheLocs.synchronized {
416+
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
417+
if (!cacheLocs.contains(rdd.id)) {
418+
// Note: if the storage level is NONE, we don't need to get locations from block manager.
419+
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
420+
IndexedSeq.fill(rdd.partitions.length)(Nil)
421+
} else {
422+
val blockIds =
423+
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
424+
blockManagerMaster.getLocations(blockIds).map { bms =>
425+
bms.map(bm => TaskLocation(bm.host, bm.executorId))
426+
}
422427
}
428+
cacheLocs(rdd.id) = locs
423429
}
424-
cacheLocs(rdd.id) = locs
430+
cacheLocs(rdd.id)
425431
}
426-
cacheLocs(rdd.id)
427432
}
428433

429434
private def clearCacheLocs(): Unit = cacheLocs.synchronized {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
4848
import org.apache.spark.scheduler.local.LocalSchedulerBackend
4949
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
5050
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockManagerMaster}
51-
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, Utils}
51+
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
5252

5353
class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
5454
extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -589,6 +589,42 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
589589
assertDataStructuresEmpty()
590590
}
591591

592+
// Note that this test is NOT perfectly reproducible when there is a deadlock as it uses
593+
// Thread.sleep, but it should never fail / flake when there is no deadlock.
594+
// If this test starts to flake, this shows that there is a deadlock!
595+
test("No Deadlock between getCacheLocs and CoalescedRDD") {
596+
val rdd = sc.parallelize(1 to 10, numSlices = 10)
597+
val coalescedRDD = rdd.coalesce(2)
598+
val executionContext = ThreadUtils.newDaemonFixedThreadPool(
599+
nThreads = 2, "test-getCacheLocs")
600+
// Used to only make progress on getCacheLocs after we acquired the lock to the RDD.
601+
val rddLock = new java.util.concurrent.Semaphore(0)
602+
val partitionsFuture = executionContext.submit(new Runnable {
603+
override def run(): Unit = {
604+
coalescedRDD.stateLock.synchronized {
605+
rddLock.release(1)
606+
// Try to access the partitions of the coalescedRDD. This will cause a call to
607+
// getCacheLocs internally.
608+
Thread.sleep(5000)
609+
coalescedRDD.partitions
610+
}
611+
}
612+
})
613+
val getCacheLocsFuture = executionContext.submit(new Runnable {
614+
override def run(): Unit = {
615+
rddLock.acquire()
616+
// Access the cache locations.
617+
// If the partition location cache is locked before the stateLock is locked,
618+
// we'll run into a deadlock.
619+
sc.dagScheduler.getCacheLocs(coalescedRDD)
620+
}
621+
})
622+
// If any of the futures throw a TimeOutException, this shows that there is a deadlock between
623+
// getCacheLocs and accessing partitions of an RDD.
624+
getCacheLocsFuture.get(120, TimeUnit.SECONDS)
625+
partitionsFuture.get(120, TimeUnit.SECONDS)
626+
}
627+
592628
test("All shuffle files on the storage endpoint should be cleaned up when it is lost") {
593629
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
594630
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")

0 commit comments

Comments
 (0)