diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 876c1aba17..d02434c4b6 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -454,7 +454,6 @@ public final class kotlinx/coroutines/JobKt { public static synthetic fun cancelChildren$default (Lkotlinx/coroutines/Job;Ljava/lang/Throwable;ILjava/lang/Object;)V public static synthetic fun cancelChildren$default (Lkotlinx/coroutines/Job;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V public static final fun cancelFutureOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Ljava/util/concurrent/Future;)V - public static final fun cancelFutureOnCompletion (Lkotlinx/coroutines/Job;Ljava/util/concurrent/Future;)Lkotlinx/coroutines/DisposableHandle; public static final fun ensureActive (Lkotlin/coroutines/CoroutineContext;)V public static final fun ensureActive (Lkotlinx/coroutines/Job;)V public static final fun getJob (Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/Job; diff --git a/kotlinx-coroutines-core/common/src/Await.kt b/kotlinx-coroutines-core/common/src/Await.kt index 7cd94a013a..845f01e506 100644 --- a/kotlinx-coroutines-core/common/src/Await.kt +++ b/kotlinx-coroutines-core/common/src/Await.kt @@ -99,6 +99,8 @@ private class AwaitAll(private val deferreds: Array>) { var disposer: DisposeHandlersOnCancel? get() = _disposer.value set(value) { _disposer.value = value } + + override val onCancelling get() = false override fun invoke(cause: Throwable?) { if (cause != null) { @@ -114,7 +116,5 @@ private class AwaitAll(private val deferreds: Array>) { // Note that all deferreds are complete here, so we don't need to dispose their nodes } } - - override val onCancelling get() = false } } diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index d8dcd3e4e1..c768a6ea0f 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -692,9 +692,9 @@ private data class CompletedContinuation( private class ChildContinuation( @JvmField val child: CancellableContinuationImpl<*> ) : JobNode() { + override val onCancelling get() = true + override fun invoke(cause: Throwable?) { child.parentCancelled(child.getContinuationCancellationCause(job)) } - - override val onCancelling get() = true } diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index 30f936b3b1..2c42687625 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -670,7 +670,7 @@ public object NonDisposableHandle : DisposableHandle, ChildHandle { private class DisposeOnCompletion( private val handle: DisposableHandle ) : JobNode() { - override fun invoke(cause: Throwable?) = handle.dispose() - override val onCancelling get() = false + + override fun invoke(cause: Throwable?) = handle.dispose() } diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 6074c1f87d..81ff9d4811 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -607,10 +607,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private inner class SelectOnJoinCompletionHandler( private val select: SelectInstance<*> ) : JobNode() { + override val onCancelling: Boolean get() = false override fun invoke(cause: Throwable?) { select.trySelect(this@JobSupport, Unit) } - override val onCancelling: Boolean get() = false } /** @@ -1263,10 +1263,10 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val child: ChildHandleNode, private val proposedUpdate: Any? ) : JobNode() { + override val onCancelling get() = false override fun invoke(cause: Throwable?) { parent.continueCompleting(state, child, proposedUpdate) } - override val onCancelling: Boolean get() = false } private class AwaitContinuation( @@ -1378,12 +1378,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private inner class SelectOnAwaitCompletionHandler( private val select: SelectInstance<*> ) : JobNode() { + override val onCancelling get() = false override fun invoke(cause: Throwable?) { val state = this@JobSupport.state val result = if (state is CompletedExceptionally) state else state.unboxState() select.trySelect(this@JobSupport, result) } - override val onCancelling: Boolean get() = false } } @@ -1462,8 +1462,16 @@ internal abstract class JobNode : LockFreeLinkedListNode(), DisposableHandle, In * Initialized by [JobSupport.invokeOnCompletionInternal]. */ lateinit var job: JobSupport + + /** + * If `false`, [invoke] will be called once the job is cancelled or is complete. + * If `true`, [invoke] is invoked as soon as the job becomes _cancelling_ instead, and if that doesn't happen, + * it will be called once the job is cancelled or is complete. + */ + abstract val onCancelling: Boolean override val isActive: Boolean get() = true override val list: NodeList? get() = null + override fun dispose() = job.removeNode(this) override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]" /** @@ -1488,13 +1496,6 @@ internal abstract class JobNode : LockFreeLinkedListNode(), DisposableHandle, In * (see [InvokeOnCompletion] and [InvokeOnCancelling]). */ abstract fun invoke(cause: Throwable?) - - /** - * If `false`, [invoke] will be called once the job is cancelled or is complete. - * If `true`, [invoke] is invoked as soon as the job becomes _cancelling_ instead, and if that doesn't happen, - * it will be called once the job is cancelled or is complete. - */ - abstract val onCancelling: Boolean } internal class NodeList : LockFreeLinkedListHead(), Incomplete { @@ -1529,20 +1530,21 @@ private class InactiveNodeList( private class InvokeOnCompletion( private val handler: CompletionHandler ) : JobNode() { - override fun invoke(cause: Throwable?) = handler.invoke(cause) override val onCancelling get() = false + override fun invoke(cause: Throwable?) = handler.invoke(cause) } private class ResumeOnCompletion( private val continuation: Continuation ) : JobNode() { - override fun invoke(cause: Throwable?) = continuation.resume(Unit) override val onCancelling get() = false + override fun invoke(cause: Throwable?) = continuation.resume(Unit) } private class ResumeAwaitOnCompletion( private val continuation: CancellableContinuationImpl ) : JobNode() { + override val onCancelling get() = false override fun invoke(cause: Throwable?) { val state = job.state assert { state !is Incomplete } @@ -1555,7 +1557,6 @@ private class ResumeAwaitOnCompletion( continuation.resume(state.unboxState() as T) } } - override val onCancelling get() = false } // -------- invokeOnCancellation nodes @@ -1565,17 +1566,17 @@ private class InvokeOnCancelling( ) : JobNode() { // delegate handler shall be invoked at most once, so here is an additional flag private val _invoked = atomic(false) + override val onCancelling get() = true override fun invoke(cause: Throwable?) { if (_invoked.compareAndSet(expect = false, update = true)) handler.invoke(cause) } - override val onCancelling get() = true } private class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobNode(), ChildHandle { override val parent: Job get() = job + override val onCancelling: Boolean get() = true override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) - override val onCancelling: Boolean get() = true } diff --git a/kotlinx-coroutines-core/jdk8/src/future/Future.kt b/kotlinx-coroutines-core/jdk8/src/future/Future.kt index ac61b1e9b4..37620bbbdb 100644 --- a/kotlinx-coroutines-core/jdk8/src/future/Future.kt +++ b/kotlinx-coroutines-core/jdk8/src/future/Future.kt @@ -138,7 +138,7 @@ public fun CompletionStage.asDeferred(): Deferred { handleCoroutineException(EmptyCoroutineContext, e) } } - result.cancelFutureOnCompletion(future) + result.invokeOnCompletion(handler = CancelFutureOnCompletion(future)) return result } @@ -190,3 +190,18 @@ private class ContinuationHandler( } } } + +private class CancelFutureOnCompletion( + private val future: Future<*> +) : JobNode() { + override val onCancelling get() = false + + override fun invoke(cause: Throwable?) { + // Don't interrupt when cancelling future on completion, because no one is going to reset this + // interruption flag and it will cause spurious failures elsewhere. + // We do not cancel the future if it's already completed in some way, + // because `cancel` on a completed future won't change the state but is not guaranteed to behave well + // on reentrancy. See https://github.com/Kotlin/kotlinx.coroutines/issues/4156 + if (cause != null && !future.isDone) future.cancel(false) + } +} diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 1d32489f19..16232bfa7c 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.Closeable import java.util.concurrent.* @@ -145,7 +144,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { - continuation.cancelFutureOnCancellation(future) + continuation.invokeOnCancellation(CancelFutureOnCancel(future)) return } // Otherwise fallback to default executor @@ -201,3 +200,12 @@ private class DisposableFutureHandle(private val future: Future<*>) : Disposable } override fun toString(): String = "DisposableFutureHandle[$future]" } + +private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler { + override fun invoke(cause: Throwable?) { + // Don't interrupt when cancelling future on completion, because no one is going to reset this + // interruption flag and it will cause spurious failures elsewhere + future.cancel(false) + } + override fun toString() = "CancelFutureOnCancel[$future]" +} diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index 7ff73e783f..f8f60a72a1 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -5,19 +5,6 @@ package kotlinx.coroutines import java.util.concurrent.* -/** - * Cancels a specified [future] when this job is cancelled. - * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). - * ``` - * invokeOnCompletion { if (it != null) future.cancel(false) } - * ``` - * - * @suppress **This an internal API and should not be used from general code.** - */ -@InternalCoroutinesApi -public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = - invokeOnCompletion(handler = CancelFutureOnCompletion(future)) - /** * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). @@ -25,22 +12,16 @@ public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = * invokeOnCancellation { if (it != null) future.cancel(false) } * ``` */ +// Warning since 1.9.0, error in 1., hidden in 1.7 +@Deprecated( + "This function does not do what its name implies: it will not cancel the future if just cancel() was called.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.invokeOnCancellation { future.cancel(false) }") +) public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit = - invokeOnCancellation(handler = CancelFutureOnCancel(future)) - -private class CancelFutureOnCompletion( - private val future: Future<*> -) : JobNode() { - override fun invoke(cause: Throwable?) { - // Don't interrupt when cancelling future on completion, because no one is going to reset this - // interruption flag and it will cause spurious failures elsewhere - if (cause != null) future.cancel(false) - } - - override val onCancelling get() = false -} + invokeOnCancellation(handler = PublicCancelFutureOnCancel(future)) -private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler { +private class PublicCancelFutureOnCancel(private val future: Future<*>) : CancelHandler { override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere diff --git a/kotlinx-coroutines-core/jvm/src/Interruptible.kt b/kotlinx-coroutines-core/jvm/src/Interruptible.kt index 40f297d145..6b52f499bf 100644 --- a/kotlinx-coroutines-core/jvm/src/Interruptible.kt +++ b/kotlinx-coroutines-core/jvm/src/Interruptible.kt @@ -95,6 +95,8 @@ private class ThreadState : JobNode() { // Registered cancellation handler private var cancelHandle: DisposableHandle? = null + override val onCancelling get() = true + fun setup(job: Job) { cancelHandle = job.invokeOnCompletion(handler = this) // Either we successfully stored it or it was immediately cancelled @@ -154,7 +156,5 @@ private class ThreadState : JobNode() { } } - override val onCancelling get() = true - private fun invalidState(state: Int): Nothing = error("Illegal state $state") } diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt index 072ae280e5..819b05e9ed 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -38,4 +38,21 @@ class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { executor.shutdown() assertEquals(1, callsToSchedule) } + + @Test + fun testCancelling() = runTest { + val executor = STPE() + launch(start = CoroutineStart.UNDISPATCHED) { + suspendCancellableCoroutine { cont -> + expect(1) + (executor.asCoroutineDispatcher() as Delay).scheduleResumeAfterDelay(1_000_000, cont) + cont.cancel() + expect(2) + } + } + expect(3) + assertTrue(executor.getQueue().isEmpty()) + executor.shutdown() + finish(4) + } } diff --git a/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt b/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt index 0b9f83b0ad..34534c8eb1 100644 --- a/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt +++ b/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt @@ -595,4 +595,20 @@ class FutureTest : TestBase() { GlobalScope.future(start = CoroutineStart.LAZY) { } } } + + @Test + fun testStackOverflowOnExceptionalCompletion() = runTest { + val future = CompletableFuture() + val didRun = AtomicBoolean(false) + future.whenComplete { _, _ -> didRun.set(true) } + val deferreds = List(100000) { future.asDeferred() } + future.completeExceptionally(TestException()) + deferreds.forEach { + assertTrue(it.isCompleted) + val exception = it.getCompletionExceptionOrNull() + assertIs(exception) + assertTrue(exception.suppressedExceptions.isEmpty()) + } + assertTrue(didRun.get()) + } }