Skip to content

Conversation

@fred-db
Copy link
Contributor

@fred-db fred-db commented Jan 25, 2024

What changes were proposed in this pull request?

  • The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time.
  • To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD.
  • When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache.
  • If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation.
  • To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache.

Why are the changes needed?

  • This is a deadlock you can run into, which can prevent any progress on the cluster.

Does this PR introduce any user-facing change?

  • No

How was this patch tested?

  • Unit test that reproduces the issue.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Jan 25, 2024
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-46861] Avoid Deadlock in DAGScheduler [SPARK-46861][CORE] Avoid Deadlock in DAGScheduler Jan 25, 2024
* One thread might hold the lock on many of these, for a chain of RDD dependencies. Deadlocks
* are possible if we try to lock another resource while holding the stateLock,
* and the lock acquisition sequence of these locks is not guaranteed to be the same.
* This can lead lead to a deadlock as one thread might first acquire the stateLock,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. lead lead -> lead


// Note that this test is NOT perfectly reproducible when there is a deadlock as it uses
// Thread.sleep, but it should never fail / flake when there is no deadlock.
// If this test starts to flake, this shows that there is a deadlock!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you for warning.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you for this fix, @fred-db .

dongjoon-hyun pushed a commit that referenced this pull request Jan 25, 2024
* The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time.
* To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD.
* When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache.
* If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation.
* To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache.

* This is a deadlock you can run into, which can prevent any progress on the cluster.

* No

* Unit test that reproduces the issue.

No

Closes #44882 from fred-db/fix-deadlock.

Authored-by: fred-db <fredrik.klauss@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 617014c)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
dongjoon-hyun pushed a commit that referenced this pull request Jan 25, 2024
* The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time.
* To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD.
* When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache.
* If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation.
* To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache.

* This is a deadlock you can run into, which can prevent any progress on the cluster.

* No

* Unit test that reproduces the issue.

No

Closes #44882 from fred-db/fix-deadlock.

Authored-by: fred-db <fredrik.klauss@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 617014c)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@dongjoon-hyun
Copy link
Member

Merged to master/3.5/3.4.

@mridulm
Copy link
Contributor

mridulm commented Jan 26, 2024

Can you add details of the deadlock stack traces to the jira please ?
I am trying to understand how this is happening on the same RDD.

@mridulm
Copy link
Contributor

mridulm commented Jan 29, 2024

+CC @dongjoon-hyun, since you merged this PR - any insights into how/why this deadlock can occur ? It is not clear to me how this is happening.

@dongjoon-hyun
Copy link
Member

To @mridulm , do you mean the test case is not reproducible from your environment, @mridulm ?

@fred-db
Copy link
Contributor Author

fred-db commented Jan 29, 2024

Hi @mridulm ,

I'm able to reproduce the deadlock consistently when running the test I added and after removing the call to stateLock.synchronized from getCacheLocs.

The issue exists essentially with CoalescedRDDs. When calling getPartitions on a CoalescedRDD, the PartitionLocations of the DefaultPartitionCoalescer will do a call to dagScheduler.getPreferredLocs, which internally calls getCacheLocs, causing the deadlock.

@mridulm
Copy link
Contributor

mridulm commented Jan 29, 2024

@dongjoon-hyun I meant the stack traces indicating a deadlock.
@fred-db thanks for the details - can you update the jira with the traces please ? That will ensure we can reference it in future as well.

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
* The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time.
* To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD.
* When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache.
* If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation.
* To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache.

* This is a deadlock you can run into, which can prevent any progress on the cluster.

* No

* Unit test that reproduces the issue.

No

Closes apache#44882 from fred-db/fix-deadlock.

Authored-by: fred-db <fredrik.klauss@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 617014c)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@attilapiros
Copy link
Contributor

If somebody is interested in the original deadlock I have executed the test without the fix (and without the timeout).

The stack trace is:

Found one Java-level deadlock:
=============================
"test-getCacheLocs-0":
  waiting to lock monitor 0x0000600000f638e0 (object 0x00000007048507d8, a scala.collection.mutable.HashMap),
  which is held by "test-getCacheLocs-1"

"test-getCacheLocs-1":
  waiting to lock monitor 0x0000600000f63810 (object 0x0000000706516030, a org.apache.spark.rdd.RDD$$anon$1),
  which is held by "test-getCacheLocs-0"

Java stack information for the threads listed above:
===================================================
"test-getCacheLocs-0":
        at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:399)
        - waiting to lock <0x00000007048507d8> (a scala.collection.mutable.HashMap)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2742)
        at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2721)
        at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1938)
        at org.apache.spark.rdd.DefaultPartitionCoalescer.currPrefLocs(CoalescedRDD.scala:180)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.$anonfun$getAllPrefLocs$1(CoalescedRDD.scala:198)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations$$Lambda$1113/0x000000012da71cb0.apply(Unknown Source)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.getAllPrefLocs(CoalescedRDD.scala:197)
        at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.<init>(CoalescedRDD.scala:190)
        at org.apache.spark.rdd.DefaultPartitionCoalescer.coalesce(CoalescedRDD.scala:391)
        at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:90)
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292)
        - locked <0x0000000706516030> (a org.apache.spark.rdd.RDD$$anon$1)
        at org.apache.spark.rdd.RDD$$Lambda$1104/0x000000012da6e7b0.apply(Unknown Source)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
        at org.apache.spark.scheduler.DAGSchedulerSuite$$anon$3.run(DAGSchedulerSuite.scala:225)
        - locked <0x0000000706516030> (a org.apache.spark.rdd.RDD$$anon$1)
        at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.13/Executors.java:539)
        at java.util.concurrent.FutureTask.run(java.base@17.0.13/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.13/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.13/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.13/Thread.java:840)
"test-getCacheLocs-1":
        at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
        - waiting to lock <0x0000000706516030> (a org.apache.spark.rdd.RDD$$anon$1)
        at org.apache.spark.rdd.RDD$$Lambda$1104/0x000000012da6e7b0.apply(Unknown Source)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:288)
        at org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:402)
        - locked <0x00000007048507d8> (a scala.collection.mutable.HashMap)
        at org.apache.spark.scheduler.DAGSchedulerSuite$$anon$4.run(DAGSchedulerSuite.scala:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.13/Executors.java:539)
        at java.util.concurrent.FutureTask.run(java.base@17.0.13/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.13/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.13/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.13/Thread.java:840)

Found 1 deadlock.

turboFei added a commit to turboFei/spark that referenced this pull request Nov 6, 2025
…e#789)

[SPARK-46861][CORE] Avoid Deadlock in DAGScheduler

* The DAGScheduler could currently run into a deadlock with another thread if both access the partitions of the same RDD at the same time.
* To make progress in getCacheLocs, we require both exclusive access to the RDD partitions and the location cache. We first lock on the location cache, and then on the RDD.
* When accessing partitions of an RDD, the RDD first acquires exclusive access on the partitions, and then might acquire exclusive access on the location cache.
* If thread 1 is able to acquire access on the RDD, while thread 2 holds the access to the location cache, we can run into a deadlock situation.
* To fix this, acquire locks in the same order. Change the DAGScheduler to first acquire the lock on the RDD, and then the lock on the location cache.

* This is a deadlock you can run into, which can prevent any progress on the cluster.

* No

* Unit test that reproduces the issue.

No

Closes apache#44882 from fred-db/fix-deadlock.

Authored-by: fred-db <fredrik.klauss@databricks.com>

(cherry picked from commit 617014c)

Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: fred-db <fredrik.klauss@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants