Skip to content

Conversation

@vrozov
Copy link
Member

@vrozov vrozov commented Apr 15, 2025

What changes were proposed in this pull request?

Do not hold uninterruptibleLock monitor while calling super.interrupt() in UninterruptibleThread, instead use newly introduced awaitInterruptThread flag and wait for super.interrupt() to be called.

Why are the changes needed?

There is potential deadlock as UninterruptibleThread may be blocked on NIO operation and interrupting channel while holding uninterruptibleLock monitor may cause deadlock like in

Found one Java-level deadlock:
=============================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
  waiting to lock monitor 0x00006000036ee3c0 (object 0x000000070f3019d0, a java.lang.Object),
  which is held by "task thread"

"task thread":
  waiting to lock monitor 0x00006000036e75a0 (object 0x000000070f70fe80, a java.lang.Object),
  which is held by "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite"

Java stack information for the threads listed above:
===================================================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
        at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base@17.0.14/AbstractInterruptibleChannel.java:157)
        - waiting to lock <0x000000070f3019d0> (a java.lang.Object)
        at java.lang.Thread.interrupt(java.base@17.0.14/Thread.java:1004)
        - locked <0x000000070f70fc90> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99)
        - locked <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$5(UninterruptibleThreadSuite.scala:159)
        - locked <0x000000070f70f9f8> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$$Lambda$216/0x000000700120d6c8.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
        at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
        at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
        at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
        at org.apache.spark.SparkFunSuite$$Lambda$205/0x0000007001207700.apply(Unknown Source)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
        at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$343/0x00000070012867b0.apply(Unknown Source)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$339/0x00000070012833e0.apply(Unknown Source)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
        at org.scalatest.SuperEngine$$Lambda$340/0x0000007001283998.apply(Unknown Source)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
        at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
        at org.scalatest.Suite.run(Suite.scala:1114)
        at org.scalatest.Suite.run$(Suite.scala:1096)
        at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$332/0x000000700127b000.apply(Unknown Source)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
        at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
        at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
        at sbt.ForkMain$Run$$Lambda$107/0x0000007001110000.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(java.base@17.0.14/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.14/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.14/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.14/Thread.java:840)
"task thread":
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96)
        - waiting to lock <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.implCloseChannel(UninterruptibleThreadSuite.scala:143)
        at java.nio.channels.spi.AbstractInterruptibleChannel.close(java.base@17.0.14/AbstractInterruptibleChannel.java:112)
        - locked <0x000000070f3019d0> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.<init>(UninterruptibleThreadSuite.scala:138)
        at org.apache.spark.util.UninterruptibleThreadSuite$$anon$5.run(UninterruptibleThreadSuite.scala:153)

Found 1 deadlock.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added 2 new test cases to the UninterruptibleThreadSuite

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Apr 15, 2025
@vrozov vrozov marked this pull request as draft April 15, 2025 17:21
@HyukjinKwon
Copy link
Member

cc @Ngone51 FYI

@wangyum
Copy link
Member

wangyum commented Apr 16, 2025

Thank you @vrozov, I'll verify it later.

@vrozov vrozov changed the title [WIP][CORE] Call interrupt() without holding uninterruptibleLock to avoid possible deadlock [SPARK-51821][CORE] Call interrupt() without holding uninterruptibleLock to avoid possible deadlock Apr 16, 2025
@vrozov vrozov marked this pull request as ready for review April 16, 2025 15:48
Comment on lines +138 to +149
val channel = new AbstractInterruptibleChannel() {
override def implCloseChannel(): Unit = {
begin()
latch.countDown()
try {
Thread.sleep(Long.MaxValue)
} catch {
case _: InterruptedException => Thread.currentThread().interrupt()
}
}
}
channel.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem to be a right use case of UninterruptibleThread. Shouldn't the whole block be executed within runUninterruptibly()? e.g.,

override def run(): Unit = {
  this.runUninterruptibly {
    ...
  }
}

The task thread can be correctly interrupted if the whole block run inside runUninterruptibly() with the limited Thread.sleep().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see where the problem is. Spark task always uses the UninterruptibleThread but this.runUninterruptibly() is only called for the Spark task that run with KafkaConsumer (#17761 is the original PR that introduced UninterruptibleThread).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UninterruptibleThread was introduced as part of SPARK-14169 and by design can be used to run and runUninterruptibly.

Copy link
Contributor

@mridulm mridulm Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming also appears to use runUninterruptibly.
Are we proposing to adapt it to not need this construct @Ngone51 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should avoid using UninterruptibleThread when it is not really needed. The problem is that executor can't distinguish the tasks for different workloads. So we have to compromise with UninterruptibleThread as the default task thread and call runUninterruptibly when it is necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm @Ngone51 Streaming requires UninterruptibleThread, please see SPARK-21248. I also don't think that it is necessary to revisit usage of UninterruptibleThread. The run() method is not affected at all. The only affected method (overridden) is interrupt() and with the fix it also won't be impacted. The only difference with Thread.interrupt() is acquiring uninterruptibleLock that is a low cost operation when there is no contention (multiple threads calling interrupt() concurrently) and as Thread.interrupt() acquires blockerLock as well, there is pretty much no difference at all.

… used and

handle case where interrupt is called from more than one thread concurrently
@mridulm
Copy link
Contributor

mridulm commented Apr 24, 2025

The test would fail even if we replace UninterruptibleThread with regular Thread right ?
I am probably missing some context here.

@vrozov
Copy link
Member Author

vrozov commented Apr 24, 2025

@mridulm Do you refer to "SPARK-51821 uninterruptibleLock deadlock" test? No, it will not fail. Why do you think it would fail?

uninterruptibleLock.synchronized {
if (uninterruptible) {
shouldInterruptThread = true
if (uninterruptibleLock.synchronized {
Copy link
Contributor

@cloud-fan cloud-fan Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style nit: shall we move the if condition into a method like def shouldDoThreadInterruption?

Copy link
Member Author

@vrozov vrozov Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan is this covered by code style guidelines? If not, I'll make the change in case other changes are necessary, otherwise, let's keep it this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a code style guideline thing, it's like the Java best practice that we should not have super long methods, the if condition shouldn't be very long either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, refactoring into appropriate method(s) improves clarity and readability

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither synchronized blocks are very long. For example

is much longer.

I'll make the change if another revision is necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not get blocked by a code style nit. @vrozov If you don't mind I can help to fix the code style via the Github UI and then we can move forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I can merge it as it is and open a follow-up PR to fix the code style. Which way do you prefer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Please wait. I was working on a different PR that I am now ready to open. I will fix the code style on Monday.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @mridulm Introduced UninterruptibleLock class with 2 helper methods.


while (uninterruptibleLock.synchronized {
// Clear the interrupted status if it's set.
shouldInterruptThread = Thread.interrupted() || shouldInterruptThread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Thread.interrupted() a cheap function? Previously we only call it once, but now it's many times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do it separately after this while loop?

Copy link
Contributor

@mridulm mridulm Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would effectively be the same @cloud-fan - interrupted is same is isInterrupted when thread has not been interrupted.
And when it is interrupted, we are going to break out of this loop anyway (so only once).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is Thread.interrupted() a cheap function

Yes, it is. It is as cheap as getting volatile boolean.

Previously we only call it once, but now it's many times.

In the best and most common case scenarios it is called once.

shall we do it separately after this while loop?

No, we don't want to enter sleep if interrupted() was already called.

@mridulm
Copy link
Contributor

mridulm commented Apr 25, 2025

@mridulm Do you refer to "SPARK-51821 uninterruptibleLock deadlock" test? No, it will not fail. Why do you think it would fail?

My bad, there was some other local changes which messed up the test

@mridulm
Copy link
Contributor

mridulm commented Apr 25, 2025

Ok, I see the problem.
The expectation is for begin and end to be within try/finally - but usually coding pattern would result in the try being used to catch InterruptedException and handle it there (via interrupt).

So instead of :

try {
  begin();
  try {
    expensiveOp();
  } finally {
    end();
  }
} catch (InterruptedException ex) {
  Thread.currentThread.interrupt();
}

it is common to do:

begin();
try {
  expensiveOp();
} catch (InterruptedException ex) {
  Thread.currentThread.interrupt();
} finally {
  end();
}

While former is how it should be perhaps done, it is not realistic to expect everyone to do so.
Thanks for the PR, let me take a look at the proposed change

@mridulm
Copy link
Contributor

mridulm commented Apr 25, 2025

Actually, I think my formulation above will still have deadlock - though for a different reason, sigh.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting work @vrozov !
Thanks for contributing this.

uninterruptible = true
}

while (uninterruptibleLock.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us move this into a private method, instead of inlining it within while.
Same for if in interrupt

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum
Copy link
Member

wangyum commented Apr 27, 2025

@mridulm @vrozov Is it possible just move super.interrupt() out of synchronized block?
image

@mridulm
Copy link
Contributor

mridulm commented Apr 27, 2025

I have to think through it @wangyum, but that does look like a very promising idea !
I am also wondering if the interrupt in runUninterruptibly needs to be within the synchronized block as well (just set a local bool within finally to decide whether to interrupt or not).

To reviewers, for ref, see here

@cloud-fan
Copy link
Contributor

@wangyum I don't think this works. After we determine needInterrupt and release the lock, def runUninterruptibly may proceed and start to run the function f. Then def interrupt proceeds and calls super.interrupt, the function f gets interrupted which is not expected.

That's why we need an extra wait in def runUninterruptibly, to wait for super.interrupt to complete, and then proceed to reset the interrupted stats and run the function f.

@vrozov
Copy link
Member Author

vrozov commented Apr 28, 2025

@mridulm

Actually, I think my formulation above will still have deadlock - though for a different reason, sigh.

I don't think that begin()/end() are supposed to be used as

try {
  begin();
  try {
    expensiveOp();
  } finally {
    end();
  }
} catch (InterruptedException ex) {
  Thread.currentThread.interrupt();
}

as end() would throw ClosedByInterruptException in finally and hide InterruptedException.

@mridulm
Copy link
Contributor

mridulm commented Apr 29, 2025

Thanks @cloud-fan - that is exactly the scenario I missed with a quick look !

@vrozov
Copy link
Member Author

vrozov commented Apr 29, 2025

Thank you @vrozov, I'll verify it later.

@wangyum did you have a chance to verify the fix?

@wangyum
Copy link
Member

wangyum commented May 2, 2025

@wangyum did you have a chance to verify the fix?
Yes, it is works as expected.

}

class UninterruptibleLock {
def awaitInterrupt(): Boolean = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not very intuitive to know the relationship between the boolean return value and the function name. So returning true means keep waiting? shall we name the function shouldAwaitInterrupt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan did more refactoring, please take a look

@cloud-fan
Copy link
Contributor

thanks, merging to master/4.0!

@cloud-fan cloud-fan closed this in bb0b2d2 May 7, 2025
cloud-fan pushed a commit that referenced this pull request May 7, 2025
…ock to avoid possible deadlock

### What changes were proposed in this pull request?
Do not hold `uninterruptibleLock` monitor while calling `super.interrupt()` in `UninterruptibleThread`, instead use newly introduced `awaitInterruptThread` flag and wait for `super.interrupt()` to be called.

### Why are the changes needed?
There is potential deadlock as `UninterruptibleThread` may be blocked on NIO operation and interrupting channel while holding `uninterruptibleLock` monitor may cause deadlock like in
```
Found one Java-level deadlock:
=============================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
  waiting to lock monitor 0x00006000036ee3c0 (object 0x000000070f3019d0, a java.lang.Object),
  which is held by "task thread"

"task thread":
  waiting to lock monitor 0x00006000036e75a0 (object 0x000000070f70fe80, a java.lang.Object),
  which is held by "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite"

Java stack information for the threads listed above:
===================================================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
        at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base17.0.14/AbstractInterruptibleChannel.java:157)
        - waiting to lock <0x000000070f3019d0> (a java.lang.Object)
        at java.lang.Thread.interrupt(java.base17.0.14/Thread.java:1004)
        - locked <0x000000070f70fc90> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99)
        - locked <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$5(UninterruptibleThreadSuite.scala:159)
        - locked <0x000000070f70f9f8> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$$Lambda$216/0x000000700120d6c8.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
        at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
        at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
        at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
        at org.apache.spark.SparkFunSuite$$Lambda$205/0x0000007001207700.apply(Unknown Source)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
        at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$343/0x00000070012867b0.apply(Unknown Source)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$339/0x00000070012833e0.apply(Unknown Source)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
        at org.scalatest.SuperEngine$$Lambda$340/0x0000007001283998.apply(Unknown Source)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
        at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
        at org.scalatest.Suite.run(Suite.scala:1114)
        at org.scalatest.Suite.run$(Suite.scala:1096)
        at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$332/0x000000700127b000.apply(Unknown Source)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
        at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
        at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
        at sbt.ForkMain$Run$$Lambda$107/0x0000007001110000.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(java.base17.0.14/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base17.0.14/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base17.0.14/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base17.0.14/Thread.java:840)
"task thread":
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96)
        - waiting to lock <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.implCloseChannel(UninterruptibleThreadSuite.scala:143)
        at java.nio.channels.spi.AbstractInterruptibleChannel.close(java.base17.0.14/AbstractInterruptibleChannel.java:112)
        - locked <0x000000070f3019d0> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.<init>(UninterruptibleThreadSuite.scala:138)
        at org.apache.spark.util.UninterruptibleThreadSuite$$anon$5.run(UninterruptibleThreadSuite.scala:153)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added 2 new test cases to the `UninterruptibleThreadSuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50594 from vrozov/uninterruptible.

Authored-by: Vlad Rozov <vrozov@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit bb0b2d2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@vrozov
Copy link
Member Author

vrozov commented May 7, 2025

Should it be also backported to 3.5 branch?

@cloud-fan
Copy link
Contributor

we can, @vrozov can you open a backport PR for 3.5?

@vrozov vrozov deleted the uninterruptible branch May 7, 2025 13:31
vrozov added a commit to vrozov/spark that referenced this pull request May 7, 2025
…ock to avoid possible deadlock

### What changes were proposed in this pull request?
Do not hold `uninterruptibleLock` monitor while calling `super.interrupt()` in `UninterruptibleThread`, instead use newly introduced `awaitInterruptThread` flag and wait for `super.interrupt()` to be called.

### Why are the changes needed?
There is potential deadlock as `UninterruptibleThread` may be blocked on NIO operation and interrupting channel while holding `uninterruptibleLock` monitor may cause deadlock like in
```
Found one Java-level deadlock:
=============================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
  waiting to lock monitor 0x00006000036ee3c0 (object 0x000000070f3019d0, a java.lang.Object),
  which is held by "task thread"

"task thread":
  waiting to lock monitor 0x00006000036e75a0 (object 0x000000070f70fe80, a java.lang.Object),
  which is held by "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite"

Java stack information for the threads listed above:
===================================================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
        at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base17.0.14/AbstractInterruptibleChannel.java:157)
        - waiting to lock <0x000000070f3019d0> (a java.lang.Object)
        at java.lang.Thread.interrupt(java.base17.0.14/Thread.java:1004)
        - locked <0x000000070f70fc90> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99)
        - locked <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$5(UninterruptibleThreadSuite.scala:159)
        - locked <0x000000070f70f9f8> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$$Lambda$216/0x000000700120d6c8.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
        at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
        at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
        at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
        at org.apache.spark.SparkFunSuite$$Lambda$205/0x0000007001207700.apply(Unknown Source)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
        at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$343/0x00000070012867b0.apply(Unknown Source)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$339/0x00000070012833e0.apply(Unknown Source)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
        at org.scalatest.SuperEngine$$Lambda$340/0x0000007001283998.apply(Unknown Source)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
        at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
        at org.scalatest.Suite.run(Suite.scala:1114)
        at org.scalatest.Suite.run$(Suite.scala:1096)
        at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$332/0x000000700127b000.apply(Unknown Source)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
        at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
        at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
        at sbt.ForkMain$Run$$Lambda$107/0x0000007001110000.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(java.base17.0.14/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base17.0.14/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base17.0.14/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base17.0.14/Thread.java:840)
"task thread":
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96)
        - waiting to lock <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.implCloseChannel(UninterruptibleThreadSuite.scala:143)
        at java.nio.channels.spi.AbstractInterruptibleChannel.close(java.base17.0.14/AbstractInterruptibleChannel.java:112)
        - locked <0x000000070f3019d0> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.<init>(UninterruptibleThreadSuite.scala:138)
        at org.apache.spark.util.UninterruptibleThreadSuite$$anon$5.run(UninterruptibleThreadSuite.scala:153)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added 2 new test cases to the `UninterruptibleThreadSuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50594 from vrozov/uninterruptible.

Authored-by: Vlad Rozov <vrozov@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit bb0b2d2)
}
}
}
t.interrupt()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that Java 8 behaves differently when interrupt() is called on not started thread.

@vrozov
Copy link
Member Author

vrozov commented May 7, 2025

@cloud-fan #50594

yhuang-db pushed a commit to yhuang-db/spark that referenced this pull request Jun 9, 2025
…ock to avoid possible deadlock

### What changes were proposed in this pull request?
Do not hold `uninterruptibleLock` monitor while calling `super.interrupt()` in `UninterruptibleThread`, instead use newly introduced `awaitInterruptThread` flag and wait for `super.interrupt()` to be called.

### Why are the changes needed?
There is potential deadlock as `UninterruptibleThread` may be blocked on NIO operation and interrupting channel while holding `uninterruptibleLock` monitor may cause deadlock like in
```
Found one Java-level deadlock:
=============================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
  waiting to lock monitor 0x00006000036ee3c0 (object 0x000000070f3019d0, a java.lang.Object),
  which is held by "task thread"

"task thread":
  waiting to lock monitor 0x00006000036e75a0 (object 0x000000070f70fe80, a java.lang.Object),
  which is held by "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite"

Java stack information for the threads listed above:
===================================================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
        at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base17.0.14/AbstractInterruptibleChannel.java:157)
        - waiting to lock <0x000000070f3019d0> (a java.lang.Object)
        at java.lang.Thread.interrupt(java.base17.0.14/Thread.java:1004)
        - locked <0x000000070f70fc90> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99)
        - locked <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$5(UninterruptibleThreadSuite.scala:159)
        - locked <0x000000070f70f9f8> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$$Lambda$216/0x000000700120d6c8.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
        at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
        at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
        at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
        at org.apache.spark.SparkFunSuite$$Lambda$205/0x0000007001207700.apply(Unknown Source)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
        at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$343/0x00000070012867b0.apply(Unknown Source)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$339/0x00000070012833e0.apply(Unknown Source)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
        at org.scalatest.SuperEngine$$Lambda$340/0x0000007001283998.apply(Unknown Source)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
        at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
        at org.scalatest.Suite.run(Suite.scala:1114)
        at org.scalatest.Suite.run$(Suite.scala:1096)
        at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$332/0x000000700127b000.apply(Unknown Source)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
        at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
        at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
        at sbt.ForkMain$Run$$Lambda$107/0x0000007001110000.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(java.base17.0.14/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base17.0.14/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base17.0.14/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base17.0.14/Thread.java:840)
"task thread":
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96)
        - waiting to lock <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.implCloseChannel(UninterruptibleThreadSuite.scala:143)
        at java.nio.channels.spi.AbstractInterruptibleChannel.close(java.base17.0.14/AbstractInterruptibleChannel.java:112)
        - locked <0x000000070f3019d0> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.<init>(UninterruptibleThreadSuite.scala:138)
        at org.apache.spark.util.UninterruptibleThreadSuite$$anon$5.run(UninterruptibleThreadSuite.scala:153)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added 2 new test cases to the `UninterruptibleThreadSuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50594 from vrozov/uninterruptible.

Authored-by: Vlad Rozov <vrozov@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Aug 29, 2025
…duplicated interrupt

### What changes were proposed in this pull request?

This PR fixes `UninterruptibleLock.isInterruptible` to avoid duplicated interruption when the thread is already interrupted.

### Why are the changes needed?

The "uninterruptible" semantic of `UninterruptibleThread`is broken (i.e., `UninterruptibleThread` is interruptible even if it's under `runUninterruptibly`) after the fix #50594. The probelm is that the state of
`shouldInterruptThread` becomes unsafe when there are multiple interrupts concurrently.

For example, thread A could interrupt UninterruptibleThread ut first before UninterruptibleThread enters `runUninterruptibly`. Right after that, another thread B starts to invoke ut.interrupt() and pass through `uninterruptibleLock.isInterruptible` (becasue at this point, `shouldInterruptThread = uninterruptible = false`). Before thread B invokes `super.interrupt()`, UninterruptibleThread ut enters `runUninterruptibly` and pass through `uninterruptibleLock.getAndSetUninterruptible` and set `uninterruptible = true`. Then, thread ut continues the check `uninterruptibleLock.isInterruptPending`. However, `uninterruptibleLock.isInterruptPending` return false at this point (due to `shouldInterruptThread = Thread.interrupted = true`) even though thread B is actully interrupting. *As a result, the state of `shouldInterruptThread` becomes inconsistent between thread B and thread ut.* Then, as `uninterruptibleLock.isInterruptPending` returns false, ut to continute to execute `f`. At the same time, thread B invokes `super.interrupt()`, and `f` could be interrupted

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested. The issue can be easily reproduced if we run `UninterruptibleThreadSuite.stress test` for 100 times in a row:
```
[info]   true did not equal false (UninterruptibleThreadSuite.scala:208)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$7(UninterruptibleThreadSuite.scala:208)
...
```
And the issue is gone after the fix.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52139 from Ngone51/fix-uninterruptiable.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Aug 29, 2025
…duplicated interrupt

### What changes were proposed in this pull request?

This PR fixes `UninterruptibleLock.isInterruptible` to avoid duplicated interruption when the thread is already interrupted.

### Why are the changes needed?

The "uninterruptible" semantic of `UninterruptibleThread`is broken (i.e., `UninterruptibleThread` is interruptible even if it's under `runUninterruptibly`) after the fix #50594. The probelm is that the state of
`shouldInterruptThread` becomes unsafe when there are multiple interrupts concurrently.

For example, thread A could interrupt UninterruptibleThread ut first before UninterruptibleThread enters `runUninterruptibly`. Right after that, another thread B starts to invoke ut.interrupt() and pass through `uninterruptibleLock.isInterruptible` (becasue at this point, `shouldInterruptThread = uninterruptible = false`). Before thread B invokes `super.interrupt()`, UninterruptibleThread ut enters `runUninterruptibly` and pass through `uninterruptibleLock.getAndSetUninterruptible` and set `uninterruptible = true`. Then, thread ut continues the check `uninterruptibleLock.isInterruptPending`. However, `uninterruptibleLock.isInterruptPending` return false at this point (due to `shouldInterruptThread = Thread.interrupted = true`) even though thread B is actully interrupting. *As a result, the state of `shouldInterruptThread` becomes inconsistent between thread B and thread ut.* Then, as `uninterruptibleLock.isInterruptPending` returns false, ut to continute to execute `f`. At the same time, thread B invokes `super.interrupt()`, and `f` could be interrupted

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested. The issue can be easily reproduced if we run `UninterruptibleThreadSuite.stress test` for 100 times in a row:
```
[info]   true did not equal false (UninterruptibleThreadSuite.scala:208)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$7(UninterruptibleThreadSuite.scala:208)
...
```
And the issue is gone after the fix.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52139 from Ngone51/fix-uninterruptiable.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 78871d7)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…ock to avoid possible deadlock

### What changes were proposed in this pull request?
Do not hold `uninterruptibleLock` monitor while calling `super.interrupt()` in `UninterruptibleThread`, instead use newly introduced `awaitInterruptThread` flag and wait for `super.interrupt()` to be called.

### Why are the changes needed?
There is potential deadlock as `UninterruptibleThread` may be blocked on NIO operation and interrupting channel while holding `uninterruptibleLock` monitor may cause deadlock like in
```
Found one Java-level deadlock:
=============================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
  waiting to lock monitor 0x00006000036ee3c0 (object 0x000000070f3019d0, a java.lang.Object),
  which is held by "task thread"

"task thread":
  waiting to lock monitor 0x00006000036e75a0 (object 0x000000070f70fe80, a java.lang.Object),
  which is held by "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite"

Java stack information for the threads listed above:
===================================================
"pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite":
        at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base17.0.14/AbstractInterruptibleChannel.java:157)
        - waiting to lock <0x000000070f3019d0> (a java.lang.Object)
        at java.lang.Thread.interrupt(java.base17.0.14/Thread.java:1004)
        - locked <0x000000070f70fc90> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99)
        - locked <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$5(UninterruptibleThreadSuite.scala:159)
        - locked <0x000000070f70f9f8> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$$Lambda$216/0x000000700120d6c8.apply$mcV$sp(Unknown Source)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
        at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
        at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
        at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
        at org.apache.spark.SparkFunSuite$$Lambda$205/0x0000007001207700.apply(Unknown Source)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
        at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$343/0x00000070012867b0.apply(Unknown Source)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$339/0x00000070012833e0.apply(Unknown Source)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
        at org.scalatest.SuperEngine$$Lambda$340/0x0000007001283998.apply(Unknown Source)
        at scala.collection.immutable.List.foreach(List.scala:334)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
        at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
        at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
        at org.scalatest.Suite.run(Suite.scala:1114)
        at org.scalatest.Suite.run$(Suite.scala:1096)
        at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
        at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$332/0x000000700127b000.apply(Unknown Source)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
        at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
        at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
        at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
        at sbt.ForkMain$Run$$Lambda$107/0x0000007001110000.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(java.base17.0.14/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base17.0.14/ThreadPoolExecutor.java:1136)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base17.0.14/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base17.0.14/Thread.java:840)
"task thread":
        at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96)
        - waiting to lock <0x000000070f70fe80> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.implCloseChannel(UninterruptibleThreadSuite.scala:143)
        at java.nio.channels.spi.AbstractInterruptibleChannel.close(java.base17.0.14/AbstractInterruptibleChannel.java:112)
        - locked <0x000000070f3019d0> (a java.lang.Object)
        at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.<init>(UninterruptibleThreadSuite.scala:138)
        at org.apache.spark.util.UninterruptibleThreadSuite$$anon$5.run(UninterruptibleThreadSuite.scala:153)

Found 1 deadlock.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added 2 new test cases to the `UninterruptibleThreadSuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50594 from vrozov/uninterruptible.

Authored-by: Vlad Rozov <vrozov@amazon.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit f0b1dbd)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…duplicated interrupt

### What changes were proposed in this pull request?

This PR fixes `UninterruptibleLock.isInterruptible` to avoid duplicated interruption when the thread is already interrupted.

### Why are the changes needed?

The "uninterruptible" semantic of `UninterruptibleThread`is broken (i.e., `UninterruptibleThread` is interruptible even if it's under `runUninterruptibly`) after the fix apache#50594. The probelm is that the state of
`shouldInterruptThread` becomes unsafe when there are multiple interrupts concurrently.

For example, thread A could interrupt UninterruptibleThread ut first before UninterruptibleThread enters `runUninterruptibly`. Right after that, another thread B starts to invoke ut.interrupt() and pass through `uninterruptibleLock.isInterruptible` (becasue at this point, `shouldInterruptThread = uninterruptible = false`). Before thread B invokes `super.interrupt()`, UninterruptibleThread ut enters `runUninterruptibly` and pass through `uninterruptibleLock.getAndSetUninterruptible` and set `uninterruptible = true`. Then, thread ut continues the check `uninterruptibleLock.isInterruptPending`. However, `uninterruptibleLock.isInterruptPending` return false at this point (due to `shouldInterruptThread = Thread.interrupted = true`) even though thread B is actully interrupting. *As a result, the state of `shouldInterruptThread` becomes inconsistent between thread B and thread ut.* Then, as `uninterruptibleLock.isInterruptPending` returns false, ut to continute to execute `f`. At the same time, thread B invokes `super.interrupt()`, and `f` could be interrupted

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually tested. The issue can be easily reproduced if we run `UninterruptibleThreadSuite.stress test` for 100 times in a row:
```
[info]   true did not equal false (UninterruptibleThreadSuite.scala:208)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$7(UninterruptibleThreadSuite.scala:208)
...
```
And the issue is gone after the fix.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52139 from Ngone51/fix-uninterruptiable.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 271392a)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants