Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stack overflow when calling asDeferred on a Future many times #4173

Merged
merged 4 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ public final class kotlinx/coroutines/JobKt {
public static synthetic fun cancelChildren$default (Lkotlinx/coroutines/Job;Ljava/lang/Throwable;ILjava/lang/Object;)V
public static synthetic fun cancelChildren$default (Lkotlinx/coroutines/Job;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static final fun cancelFutureOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Ljava/util/concurrent/Future;)V
public static final fun cancelFutureOnCompletion (Lkotlinx/coroutines/Job;Ljava/util/concurrent/Future;)Lkotlinx/coroutines/DisposableHandle;
public static final fun ensureActive (Lkotlin/coroutines/CoroutineContext;)V
public static final fun ensureActive (Lkotlinx/coroutines/Job;)V
public static final fun getJob (Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/Job;
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Await.kt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
var disposer: DisposeHandlersOnCancel?
get() = _disposer.value
set(value) { _disposer.value = value }

override val onCancelling get() = false

override fun invoke(cause: Throwable?) {
if (cause != null) {
Expand All @@ -114,7 +116,5 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
// Note that all deferreds are complete here, so we don't need to dispose their nodes
}
}

override val onCancelling get() = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -692,9 +692,9 @@ private data class CompletedContinuation<R>(
private class ChildContinuation(
@JvmField val child: CancellableContinuationImpl<*>
) : JobNode() {
override val onCancelling get() = true

override fun invoke(cause: Throwable?) {
child.parentCancelled(child.getContinuationCancellationCause(job))
}

override val onCancelling get() = true
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Job.kt
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ public object NonDisposableHandle : DisposableHandle, ChildHandle {
private class DisposeOnCompletion(
private val handle: DisposableHandle
) : JobNode() {
override fun invoke(cause: Throwable?) = handle.dispose()

override val onCancelling get() = false

override fun invoke(cause: Throwable?) = handle.dispose()
}
31 changes: 16 additions & 15 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private inner class SelectOnJoinCompletionHandler(
private val select: SelectInstance<*>
) : JobNode() {
override val onCancelling: Boolean get() = false
override fun invoke(cause: Throwable?) {
select.trySelect(this@JobSupport, Unit)
}
override val onCancelling: Boolean get() = false
}

/**
Expand Down Expand Up @@ -1263,10 +1263,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val child: ChildHandleNode,
private val proposedUpdate: Any?
) : JobNode() {
override val onCancelling get() = false
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
override val onCancelling: Boolean get() = false
}

private class AwaitContinuation<T>(
Expand Down Expand Up @@ -1378,12 +1378,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private inner class SelectOnAwaitCompletionHandler(
private val select: SelectInstance<*>
) : JobNode() {
override val onCancelling get() = false
override fun invoke(cause: Throwable?) {
val state = this@JobSupport.state
val result = if (state is CompletedExceptionally) state else state.unboxState()
select.trySelect(this@JobSupport, result)
}
override val onCancelling: Boolean get() = false
}
}

Expand Down Expand Up @@ -1462,8 +1462,16 @@ internal abstract class JobNode : LockFreeLinkedListNode(), DisposableHandle, In
* Initialized by [JobSupport.invokeOnCompletionInternal].
*/
lateinit var job: JobSupport

/**
* If `false`, [invoke] will be called once the job is cancelled or is complete.
* If `true`, [invoke] is invoked as soon as the job becomes _cancelling_ instead, and if that doesn't happen,
* it will be called once the job is cancelled or is complete.
*/
abstract val onCancelling: Boolean
override val isActive: Boolean get() = true
override val list: NodeList? get() = null

override fun dispose() = job.removeNode(this)
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
/**
Expand All @@ -1488,13 +1496,6 @@ internal abstract class JobNode : LockFreeLinkedListNode(), DisposableHandle, In
* (see [InvokeOnCompletion] and [InvokeOnCancelling]).
*/
abstract fun invoke(cause: Throwable?)

/**
* If `false`, [invoke] will be called once the job is cancelled or is complete.
* If `true`, [invoke] is invoked as soon as the job becomes _cancelling_ instead, and if that doesn't happen,
* it will be called once the job is cancelled or is complete.
*/
abstract val onCancelling: Boolean
}

internal class NodeList : LockFreeLinkedListHead(), Incomplete {
Expand Down Expand Up @@ -1529,20 +1530,21 @@ private class InactiveNodeList(
private class InvokeOnCompletion(
private val handler: CompletionHandler
) : JobNode() {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
override val onCancelling get() = false
override fun invoke(cause: Throwable?) = handler.invoke(cause)
}

private class ResumeOnCompletion(
private val continuation: Continuation<Unit>
) : JobNode() {
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
override val onCancelling get() = false
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
}

private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override val onCancelling get() = false
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
Expand All @@ -1555,7 +1557,6 @@ private class ResumeAwaitOnCompletion<T>(
continuation.resume(state.unboxState() as T)
}
}
override val onCancelling get() = false
}

// -------- invokeOnCancellation nodes
Expand All @@ -1565,17 +1566,17 @@ private class InvokeOnCancelling(
) : JobNode() {
// delegate handler shall be invoked at most once, so here is an additional flag
private val _invoked = atomic(false)
override val onCancelling get() = true
override fun invoke(cause: Throwable?) {
if (_invoked.compareAndSet(expect = false, update = true)) handler.invoke(cause)
}
override val onCancelling get() = true
}

private class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobNode(), ChildHandle {
override val parent: Job get() = job
override val onCancelling: Boolean get() = true
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
override val onCancelling: Boolean get() = true
}
17 changes: 16 additions & 1 deletion kotlinx-coroutines-core/jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
handleCoroutineException(EmptyCoroutineContext, e)
}
}
result.cancelFutureOnCompletion(future)
result.invokeOnCompletion(handler = CancelFutureOnCompletion(future))
return result
}

Expand Down Expand Up @@ -190,3 +190,18 @@ private class ContinuationHandler<T>(
}
}
}

private class CancelFutureOnCompletion(
private val future: Future<*>
) : JobNode() {
override val onCancelling get() = false

override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere.
// We do not cancel the future if it's already completed in some way,
// because `cancel` on a completed future won't change the state but is not guaranteed to behave well
// on reentrancy. See https://github.com/Kotlin/kotlinx.coroutines/issues/4156
if (cause != null && !future.isDone) future.cancel(false)
zuevmaxim marked this conversation as resolved.
Show resolved Hide resolved
}
}
12 changes: 10 additions & 2 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.Closeable
import java.util.concurrent.*
Expand Down Expand Up @@ -145,7 +144,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
)
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
continuation.invokeOnCancellation(CancelFutureOnCancel(future))
return
}
// Otherwise fallback to default executor
Expand Down Expand Up @@ -201,3 +200,12 @@ private class DisposableFutureHandle(private val future: Future<*>) : Disposable
}
override fun toString(): String = "DisposableFutureHandle[$future]"
}

private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
zuevmaxim marked this conversation as resolved.
Show resolved Hide resolved
}
override fun toString() = "CancelFutureOnCancel[$future]"
}
35 changes: 8 additions & 27 deletions kotlinx-coroutines-core/jvm/src/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,23 @@ package kotlinx.coroutines

import java.util.concurrent.*

/**
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCompletion { if (it != null) future.cancel(false) }
* ```
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
invokeOnCompletion(handler = CancelFutureOnCompletion(future))

/**
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCancellation { if (it != null) future.cancel(false) }
* ```
*/
// Warning since 1.9.0, error in 1., hidden in 1.7
@Deprecated(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please drop a comment since what version it's deprecated as we do in other places

"This function does not do what its name implies: it will not cancel the future if just cancel() was called.",
zuevmaxim marked this conversation as resolved.
Show resolved Hide resolved
dkhalanskyjb marked this conversation as resolved.
Show resolved Hide resolved
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("this.invokeOnCancellation { future.cancel(false) }")
)
public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit =
zuevmaxim marked this conversation as resolved.
Show resolved Hide resolved
invokeOnCancellation(handler = CancelFutureOnCancel(future))

private class CancelFutureOnCompletion(
private val future: Future<*>
) : JobNode() {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
if (cause != null) future.cancel(false)
}

override val onCancelling get() = false
}
invokeOnCancellation(handler = PublicCancelFutureOnCancel(future))

private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler {
private class PublicCancelFutureOnCancel(private val future: Future<*>) : CancelHandler {
zuevmaxim marked this conversation as resolved.
Show resolved Hide resolved
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/src/Interruptible.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ private class ThreadState : JobNode() {
// Registered cancellation handler
private var cancelHandle: DisposableHandle? = null

override val onCancelling get() = true

fun setup(job: Job) {
cancelHandle = job.invokeOnCompletion(handler = this)
// Either we successfully stored it or it was immediately cancelled
Expand Down Expand Up @@ -154,7 +156,5 @@ private class ThreadState : JobNode() {
}
}

override val onCancelling get() = true

private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,21 @@ class ExecutorAsCoroutineDispatcherDelayTest : TestBase() {
executor.shutdown()
assertEquals(1, callsToSchedule)
}

@Test
fun testCancelling() = runTest {
val executor = STPE()
launch(start = CoroutineStart.UNDISPATCHED) {
suspendCancellableCoroutine<Unit> { cont ->
expect(1)
(executor.asCoroutineDispatcher() as Delay).scheduleResumeAfterDelay(1_000_000, cont)
cont.cancel()
expect(2)
}
}
expect(3)
assertTrue(executor.getQueue().isEmpty())
executor.shutdown()
finish(4)
}
}
16 changes: 16 additions & 0 deletions kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -595,4 +595,20 @@ class FutureTest : TestBase() {
GlobalScope.future<Unit>(start = CoroutineStart.LAZY) { }
}
}

@Test
fun testStackOverflowOnExceptionalCompletion() = runTest {
val future = CompletableFuture<Unit>()
val didRun = AtomicBoolean(false)
future.whenComplete { _, _ -> didRun.set(true) }
val deferreds = List(100000) { future.asDeferred() }
future.completeExceptionally(TestException())
deferreds.forEach {
assertTrue(it.isCompleted)
val exception = it.getCompletionExceptionOrNull()
assertIs<TestException>(exception)
assertTrue(exception.suppressedExceptions.isEmpty())
}
assertTrue(didRun.get())
}
}