From c0c167fd2863292d40e54bcb39973772773fac14 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 27 Aug 2025 12:28:54 +0800 Subject: [PATCH 1/5] fix --- .../scala/org/apache/spark/util/UninterruptibleThread.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index 8fba5ed944c6..782b85b91238 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -100,7 +100,8 @@ private[spark] class UninterruptibleThread( * @return true when there is no concurrent [[runUninterruptibly()]] call ([[uninterruptible]] * is true) and no concurrent [[interrupt()]] call, otherwise false */ - def isInterruptible: Boolean = synchronized { + def isInterruptible(t: Thread): Boolean = synchronized { +// shouldInterruptThread = uninterruptible || t.isInterrupted shouldInterruptThread = uninterruptible // as we are releasing uninterruptibleLock before calling super.interrupt() there is a // possibility that runUninterruptibly() would be called after lock is released but before @@ -157,7 +158,7 @@ private[spark] class UninterruptibleThread( * interrupted until it enters into the interruptible status. */ override def interrupt(): Unit = { - if (uninterruptibleLock.isInterruptible) { + if (uninterruptibleLock.isInterruptible(this)) { try { super.interrupt() } finally { From 20e6bfbf01c1105c26340b80f5749a9206804fe5 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 27 Aug 2025 12:29:34 +0800 Subject: [PATCH 2/5] comment --- .../org/apache/spark/util/UninterruptibleThread.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index 782b85b91238..ca5d3f4f2b96 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -101,8 +101,11 @@ private[spark] class UninterruptibleThread( * is true) and no concurrent [[interrupt()]] call, otherwise false */ def isInterruptible(t: Thread): Boolean = synchronized { -// shouldInterruptThread = uninterruptible || t.isInterrupted - shouldInterruptThread = uninterruptible + // SPARK-53394: We should not interrupt the thread when it is already interrupted. + // Otherwise, the state of `shouldInterruptThread` becomes inconsistent between + // `isInterruptible()` and `isInterruptPending()`, leading to `UninterruptibleThread` + // be interruptible under `runUninterruptibly`. + shouldInterruptThread = uninterruptible || t.isInterrupted // as we are releasing uninterruptibleLock before calling super.interrupt() there is a // possibility that runUninterruptibly() would be called after lock is released but before // super.interrupt() is called. In this case to prevent runUninterruptibly() from being From fbeb5aa7f8257ab144a03a858d631e09a17b13da Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Wed, 27 Aug 2025 15:20:01 +0800 Subject: [PATCH 3/5] address comment --- .../scala/org/apache/spark/util/UninterruptibleThread.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index ca5d3f4f2b96..3b998736c5b8 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -100,12 +100,12 @@ private[spark] class UninterruptibleThread( * @return true when there is no concurrent [[runUninterruptibly()]] call ([[uninterruptible]] * is true) and no concurrent [[interrupt()]] call, otherwise false */ - def isInterruptible(t: Thread): Boolean = synchronized { + def isInterruptible: Boolean = synchronized { // SPARK-53394: We should not interrupt the thread when it is already interrupted. // Otherwise, the state of `shouldInterruptThread` becomes inconsistent between // `isInterruptible()` and `isInterruptPending()`, leading to `UninterruptibleThread` // be interruptible under `runUninterruptibly`. - shouldInterruptThread = uninterruptible || t.isInterrupted + shouldInterruptThread = uninterruptible || UninterruptibleThread.this.isInterrupted // as we are releasing uninterruptibleLock before calling super.interrupt() there is a // possibility that runUninterruptibly() would be called after lock is released but before // super.interrupt() is called. In this case to prevent runUninterruptibly() from being @@ -161,7 +161,7 @@ private[spark] class UninterruptibleThread( * interrupted until it enters into the interruptible status. */ override def interrupt(): Unit = { - if (uninterruptibleLock.isInterruptible(this)) { + if (uninterruptibleLock.isInterruptible) { try { super.interrupt() } finally { From 48dbe43b672e0fa92ea5a6e3624f52306c24c2b3 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 28 Aug 2025 10:12:56 +0800 Subject: [PATCH 4/5] update comment --- .../scala/org/apache/spark/util/UninterruptibleThread.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index 3b998736c5b8..fc8838672b54 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -102,9 +102,9 @@ private[spark] class UninterruptibleThread( */ def isInterruptible: Boolean = synchronized { // SPARK-53394: We should not interrupt the thread when it is already interrupted. - // Otherwise, the state of `shouldInterruptThread` becomes inconsistent between - // `isInterruptible()` and `isInterruptPending()`, leading to `UninterruptibleThread` - // be interruptible under `runUninterruptibly`. + // Otherwise, the state of shouldInterruptThread becomes inconsistent between + // isInterruptible() and isInterruptPending(), leading to UninterruptibleThread + // be interruptible under runUninterruptibly. shouldInterruptThread = uninterruptible || UninterruptibleThread.this.isInterrupted // as we are releasing uninterruptibleLock before calling super.interrupt() there is a // possibility that runUninterruptibly() would be called after lock is released but before From 6d473d2d4daeb1103af02636751e8a5ae4846f33 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 28 Aug 2025 11:59:17 +0800 Subject: [PATCH 5/5] address comment --- .../apache/spark/util/UninterruptibleThread.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index fc8838672b54..3d504856f566 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -101,18 +101,17 @@ private[spark] class UninterruptibleThread( * is true) and no concurrent [[interrupt()]] call, otherwise false */ def isInterruptible: Boolean = synchronized { - // SPARK-53394: We should not interrupt the thread when it is already interrupted. - // Otherwise, the state of shouldInterruptThread becomes inconsistent between - // isInterruptible() and isInterruptPending(), leading to UninterruptibleThread - // be interruptible under runUninterruptibly. - shouldInterruptThread = uninterruptible || UninterruptibleThread.this.isInterrupted + shouldInterruptThread = uninterruptible // as we are releasing uninterruptibleLock before calling super.interrupt() there is a // possibility that runUninterruptibly() would be called after lock is released but before // super.interrupt() is called. In this case to prevent runUninterruptibly() from being // interrupted, we use awaitInterruptThread flag. We need to set it only if // runUninterruptibly() is not yet set uninterruptible to true (!shouldInterruptThread) and - // there is no other threads that called interrupt (awaitInterruptThread is already true) - if (!shouldInterruptThread && !awaitInterruptThread) { + // there is no other threads that called interrupt (awaitInterruptThread is already true or + // isInterrupted is true. (SPARK-53394) Otherwise, the state of shouldInterruptThread would + // become inconsistent between isInterruptible() and isInterruptPending(), leading to + // UninterruptibleThread be interruptible under runUninterruptibly.) + if (!shouldInterruptThread && !awaitInterruptThread && !isInterrupted) { awaitInterruptThread = true true } else {