From 3b4142d01a4f1741c286bc52845cd1c4323eee1d Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Sun, 28 Jun 2020 17:37:58 +0100 Subject: [PATCH 1/3] Repair some corner cases in cancellation propagation between coroutines and listenable futures Implement bidirectional cancellation for `future` coroutine builder. This also: * Refactors JobListenableFuture infrastructure so it can be reused in CoroutineScope.future and Deferred.asListenableFuture; * Provides more descriptive `toString` implementation for the returned Future; * Fixes stack traces in thrown exception, so it includes call to get() that triggered the exception to be thrown; * Hides ListenableFuture.asDeferred return type, so it can't be casted to CompletableDeferred; * Adds more tests to cover fixed corner cases; * Improves documentation; * Suppresses annoying warnings in tests. --- .../src/ListenableFuture.kt | 295 ++++++++++-------- .../test/ListenableFutureTest.kt | 146 ++++++++- 2 files changed, 296 insertions(+), 145 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 974e246283..99aae7cf1b 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -17,8 +17,11 @@ import kotlin.coroutines.* * The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws * [IllegalArgumentException], because Futures don't have a way to start lazily. * - * The created coroutine is cancelled when the resulting future completes successfully, fails, or - * is cancelled. + * When the created coroutine [isCompleted][Job.isCompleted], it will try to + * *synchronously* complete the returned Future with the same outcome. This will + * succeed, barring a race with external cancellation of returned [ListenableFuture]. + * + * Cancellation is propagated bidirectionally. * * `CoroutineContext` is inherited from this [CoroutineScope]. Additional context elements can be * added/overlaid by passing [context]. @@ -32,8 +35,10 @@ import kotlin.coroutines.* * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging * facilities. * - * Note that the error and cancellation semantics of [future] are _subtly different_ than - * [asListenableFuture]'s. See [ListenableFutureCoroutine] for details. + * Note that the error and cancellation semantics of [future] are _subtly different_ than [asListenableFuture]'s. + * In particular, any exception that happens in the coroutine after returned future is + * successfully cancelled will be passed to the [CoroutineExceptionHandler] from the [context]. + * See [ListenableFutureCoroutine] for details. * * @param context added overlaying [CoroutineScope.coroutineContext] to form the new context. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. @@ -46,14 +51,9 @@ public fun CoroutineScope.future( ): ListenableFuture { require(!start.isLazy) { "$start start is not supported" } val newContext = newCoroutineContext(context) - val future = SettableFuture.create() - val coroutine = ListenableFutureCoroutine(newContext, future) - future.addListener( - coroutine, - MoreExecutors.directExecutor()) + val coroutine = ListenableFutureCoroutine(newContext) coroutine.start(start, coroutine, block) - // Return hides the SettableFuture. This should prevent casting. - return object: ListenableFuture by future {} + return coroutine.future } /** @@ -70,7 +70,7 @@ public fun CoroutineScope.future( * When `this` `ListenableFuture` is [successfully cancelled][java.util.concurrent.Future.cancel], * it will cancel the returned `Deferred`. * - * When the returned `Deferred` is [cancelled][Deferred.cancel()], it will try to propagate the + * When the returned `Deferred` is [cancelled][Deferred.cancel], it will try to propagate the * cancellation to `this` `ListenableFuture`. Propagation will succeed, barring a race with the * `ListenableFuture` completing normally. This is the only case in which the returned `Deferred` * will complete with a different outcome than `this` `ListenableFuture`. @@ -152,7 +152,8 @@ public fun ListenableFuture.asDeferred(): Deferred { deferred.invokeOnCompletion { cancel(false) } - return deferred + // Return hides the CompletableDeferred. This should prevent casting. + return object : Deferred by deferred {} } /** @@ -195,13 +196,21 @@ private fun ExecutionException.nonNullCause(): Throwable { * * This is inherently a race. See [Future.cancel] for a description of `Future` cancellation * semantics. See [Job] for a description of coroutine cancellation semantics. See - * [DeferredListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and + * [JobListenableFuture.cancel] for greater detail on the overlapped cancellation semantics and * corner cases of this method. */ public fun Deferred.asListenableFuture(): ListenableFuture { - val outerFuture = OuterFuture(this) - outerFuture.afterInit() - return outerFuture + val listenableFuture = JobListenableFuture(this) + // This invokeOnCompletion completes the JobListenableFuture with the same result as `this` Deferred. + // The JobListenableFuture may have completed earlier if it got cancelled! See JobListenableFuture.cancel(). + invokeOnCompletion { throwable -> + if (throwable == null) { + listenableFuture.complete(getCompleted()) + } else { + listenableFuture.completeExceptionally(throwable) + } + } + return listenableFuture } /** @@ -215,7 +224,6 @@ public fun Deferred.asListenableFuture(): ListenableFuture { * This method is intended to be used with one-shot Futures, so on coroutine cancellation, the Future is cancelled as well. * If cancelling the given future is undesired, use [Futures.nonCancellationPropagating] or * [kotlinx.coroutines.NonCancellable]. - * */ public suspend fun ListenableFuture.await(): T { try { @@ -255,8 +263,7 @@ private class ToContinuation( continuation.cancel() } else { try { - continuation.resumeWith( - Result.success(Uninterruptibles.getUninterruptibly(futureToObserve))) + continuation.resume(Uninterruptibles.getUninterruptibly(futureToObserve)) } catch (e: ExecutionException) { // ExecutionException is the only kind of exception that can be thrown from a gotten // Future. Anything else showing up here indicates a very fundamental bug in a @@ -271,57 +278,46 @@ private class ToContinuation( * An [AbstractCoroutine] intended for use directly creating a [ListenableFuture] handle to * completion. * - * The code in the [Runnable] portion of the class is registered as a [ListenableFuture] callback. - * See [run] for details. Both types are implemented by this object to save an allocation. + * If [future] is successfully cancelled, cancellation is propagated to `this` `Coroutine`. + * By documented contract, a [Future] has been cancelled if + * and only if its `isCancelled()` method returns true. + * + * Any error that occurs after successfully cancelling a [ListenableFuture] will be passed + * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit + * it to return an error after it is successfully cancelled. + * + * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully + * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to + * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the + * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that + * the [Deferred] pointing to the task will be used to observe any error outcome occurring after + * cancellation. + * + * This may be counterintuitive, but it maintains the error and cancellation contracts of both + * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point + * to the same running task. */ private class ListenableFutureCoroutine( - context: CoroutineContext, - private val future: SettableFuture -) : AbstractCoroutine(context), Runnable { + context: CoroutineContext +) : AbstractCoroutine(context) { - /** - * When registered as a [ListenableFuture] listener, cancels the returned [Coroutine] if - * [future] is successfully cancelled. By documented contract, a [Future] has been cancelled if - * and only if its `isCancelled()` method returns true. - * - * Any error that occurs after successfully cancelling a [ListenableFuture] - * created by submitting the returned object as a [Runnable] to an `Executor` will be passed - * to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit - * it to return an error after it is successfully cancelled. - * - * By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully - * cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to - * the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the - * cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that - * the [Deferred] pointing to the task will be used to observe any error outcome occurring after - * cancellation. - * - * This may be counterintuitive, but it maintains the error and cancellation contracts of both - * the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point - * to the same running task. - */ - override fun run() { - if (future.isCancelled) { - cancel() - } - } + // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture. + val future = JobListenableFuture(this) override fun onCompleted(value: T) { - future.set(value) + future.complete(value) } - // TODO: This doesn't actually cancel the Future. There doesn't seem to be bidi cancellation? override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!future.setException(cause) && !handled) { - // prevents loss of exception that was not handled by parent & could not be set to SettableFuture + if (!future.completeExceptionally(cause) && !handled) { + // prevents loss of exception that was not handled by parent & could not be set to JobListenableFuture handleCoroutineException(context, cause) } } } /** - * A [ListenableFuture] that delegates to an internal [DeferredListenableFuture], collaborating with - * it. + * A [ListenableFuture] that delegates to an internal [SettableFuture], collaborating with it. * * This setup allows the returned [ListenableFuture] to maintain the following properties: * @@ -333,130 +329,161 @@ private class ListenableFutureCoroutine( * - Fully correct cancellation and listener happens-after obeying [Future] and * [ListenableFuture]'s documented and implicit contracts is surprisingly difficult to achieve. * The best way to be correct, especially given the fun corner cases from - * [AsyncFuture.setAsync], is to just use an [AsyncFuture]. - * - To maintain sanity, this class implements [ListenableFuture] and uses an inner [AsyncFuture] - * around its input [deferred] as a state engine to establish happens-after-completion. This - * could probably be compressed into one subclass of [AsyncFuture] to save an allocation, at the + * [AbstractFuture.setFuture], is to just use an [AbstractFuture]. + * - To maintain sanity, this class implements [ListenableFuture] and uses an auxiliary [SettableFuture] + * around coroutine's result as a state engine to establish happens-after-completion. This + * could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the * cost of the implementation's readability. */ -private class OuterFuture(private val deferred: Deferred): ListenableFuture { - val innerFuture = DeferredListenableFuture(deferred) +private class JobListenableFuture(private val jobToCancel: Job): ListenableFuture { + /** + * Serves as a state machine for [Future] cancellation. + * + * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and + * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to + * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned. + */ + private val auxFuture = SettableFuture.create>() - // Adding the listener after initialization resolves partial construction hairpin problem. - // - // This invokeOnCompletion completes the innerFuture as `deferred` does. The innerFuture may - // have completed earlier if it got cancelled! See DeferredListenableFuture. - fun afterInit() { - deferred.invokeOnCompletion { - innerFuture.complete() - } - } + /** + * When the attached coroutine [isCompleted][Job.isCompleted] successfully + * its outcome should be passed to this method. + * + * This should succeed barring a race with external cancellation. + */ + fun complete(result: T) = auxFuture.set(Result.Success(result)) + + /** + * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled] + * its outcome should be passed to this method. + * + * This method will map coroutine's exception into corresponding Future's exception. + * + * This should succeed barring a race with external cancellation. + */ + // CancellationException is wrapped into `Result.Cancellation` to preserve original cause and message. + // All the other exceptions are delegated to SettableFuture.setException. + fun completeExceptionally(t: Throwable) = + if (t is CancellationException) auxFuture.set(Result.Cancellation(t)) else auxFuture.setException(t) /** * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to * [Job.isCancelled]. * - * When done, this Future is cancelled if its innerFuture is cancelled, or if its delegate - * [deferred] is cancelled. Cancellation of [innerFuture] collaborates with this class. + * When done, this Future is cancelled if its [auxFuture] is cancelled, or if [auxFuture] + * contains [CancellationException]. * - * See [DeferredListenableFuture.cancel]. + * See [cancel]. */ override fun isCancelled(): Boolean { // This expression ensures that isCancelled() will *never* return true when isDone() returns false. // In the case that the deferred has completed with cancellation, completing `this`, its // reaching the "cancelled" state with a cause of CancellationException is treated as the - // same thing as innerFuture getting cancelled. If the Job is in the "cancelling" state and + // same thing as auxFuture getting cancelled. If the Job is in the "cancelling" state and // this Future hasn't itself been successfully cancelled, the Future will return // isCancelled() == false. This is the only discovered way to reconcile the two different // cancellation contracts. - return isDone - && (innerFuture.isCancelled - || deferred.getCompletionExceptionOrNull() is kotlinx.coroutines.CancellationException) + return auxFuture.isCancelled || (isDone && Uninterruptibles.getUninterruptibly(auxFuture).isCancelled) } /** - * Waits for [innerFuture] to complete by blocking, then uses the [deferred] returned by that - * Future to get the `T` value `this` [ListenableFuture] is pointing to. This establishes - * happens-after ordering for completion of the [Deferred] input to [OuterFuture]. + * Waits for [auxFuture] to complete by blocking, then uses its [result][Result] + * to get the `T` value `this` [ListenableFuture] is pointing to. This establishes + * happens-after ordering for completion of the entangled coroutine. * - * `innerFuture` _must be complete_ in order for the [isDone] and [isCancelled] happens-after - * contract of [Future] to be correctly followed. If this method were to directly use - * _`this.deferred`_ instead of blocking on its `innerFuture`, the [Deferred] that this - * [ListenableFuture] is created from might be in an incomplete state when used by `get()`. + * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally. + * Otherwise it returns [Result] that encapsulates outcome of the entangled coroutine. + * + * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after + * contract of [Future] to be correctly followed. */ override fun get(): T { - return getInternal(innerFuture.get()) + return getInternal(auxFuture.get()) } /** See [get()]. */ override fun get(timeout: Long, unit: TimeUnit): T { - return getInternal(innerFuture.get(timeout, unit)) + return getInternal(auxFuture.get(timeout, unit)) } /** See [get()]. */ - private fun getInternal(deferred: Deferred): T { - if (deferred.isCancelled) { - val exception = deferred.getCompletionExceptionOrNull() - if (exception is kotlinx.coroutines.CancellationException) { - throw exception - } else { - throw ExecutionException(exception) - } - } else { - return deferred.getCompleted() - } + private fun getInternal(result: Result): T = when(result) { + is Result.Success -> result.value + is Result.Cancellation -> throw CancellationException().initCause(result.exception) } override fun addListener(listener: Runnable, executor: Executor) { - innerFuture.addListener(listener, executor) + auxFuture.addListener(listener, executor) } override fun isDone(): Boolean { - return innerFuture.isDone - } - - override fun cancel(mayInterruptIfRunning: Boolean): Boolean { - return innerFuture.cancel(mayInterruptIfRunning) - } -} - -/** - * Holds a delegate deferred, and serves as a state machine for [Future] cancellation. - * - * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and - * cancellation semantics. By using that type, the [OuterFuture] can delegate its semantics to - * _this_ `Future` `get()` the result in such a way that the `Deferred` is always complete when - * returned. - */ -private class DeferredListenableFuture( - private val deferred: Deferred -) : AbstractFuture>() { - - fun complete() { - set(deferred) + return auxFuture.isDone } /** - * Tries to cancel the task. This is fundamentally racy. + * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy. * - * For any given call to `cancel()`, if [deferred] is already completed, the call will complete - * this Future with it, and fail to cancel. Otherwise, the - * call to `cancel()` will try to cancel this Future: if and only if cancellation of this - * succeeds, [deferred] will have its [Deferred.cancel] called. + * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture] + * succeeds, [jobToCancel] will have its [Job.cancel] called. * - * This arrangement means that [deferred] _might not successfully cancel_, if the race resolves - * in a particular way. [deferred] may also be in its "cancelling" state while this + * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves + * in a particular way. [jobToCancel] may also be in its "cancelling" state while this * ListenableFuture is complete and cancelled. - * - * [OuterFuture] collaborates with this class to present a more cohesive picture and ensure - * that certain combinations of cancelled/cancelling states can't be observed. */ override fun cancel(mayInterruptIfRunning: Boolean): Boolean { - return if (super.cancel(mayInterruptIfRunning)) { - deferred.cancel() + // TODO: call jobToCancel.cancel() _before_ running the listeners. + // `auxFuture.cancel()` will execute auxFuture's listeners. This delays cancellation of + // `jobToCancel` until after auxFuture's listeners have already run. + // Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized. + return if (auxFuture.cancel(mayInterruptIfRunning)) { + jobToCancel.cancel() true } else { false } } + + override fun toString(): String = buildString { + append(super.toString()) + append("[status=") + if (isDone) { + try { + when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) { + is Result.Success -> append("SUCCESS, result=[${result.value}") + is Result.Cancellation -> append("CANCELLED, cause=[${result.exception}]") + } + } catch (e: CancellationException) { + // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message. + append("CANCELLED") + } catch (e: ExecutionException) { + append("FAILURE, cause=[${e.cause}]") + } catch (t: Throwable) { + // Violation of Future's contract, should never happen. + append("UNKNOWN, cause=[${t.javaClass} thrown from get()]") + } + } else { + append("PENDING, delegate=[$auxFuture]") + } + } +} + +/** + * Represents the result of a `Coroutine` in the sense of [Future]. + * + * - When coroutine [isCompleted][Job.isCompleted] successfully, its result is wrapped into [Success]. + * - When coroutine [isCompleted][Job.isCompleted] exceptionally, meaning [isCancelled] + * returns `true`. According to [Future]'s contract, the exception can be of two types: + * - [CancellationException] if the coroutine was _cancelled normally_, meaning it threw [CancellationException]. + * In this case the exception is wrapped into [Result.Cancellation]. + * - [ExecutionException] with the original [Throwable] in its [cause][ExecutionException.cause] otherwise. + * This case is handled separately. See [JobListenableFuture.completeExceptionally]. + */ +private sealed class Result { + /** Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to [Job.isCancelled] */ + open val isCancelled get() = false + + class Success(val value: T) : Result() + class Cancellation(val exception: CancellationException) : Result() { + override val isCancelled get() = true + } } diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index a9a7f7ba9d..a023111c9e 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.guava import com.google.common.util.concurrent.* import kotlinx.coroutines.* import org.junit.* +import org.junit.Ignore import org.junit.Test import java.util.concurrent.* import java.util.concurrent.CancellationException @@ -315,6 +316,28 @@ class ListenableFutureTest : TestBase() { finish(4) } + @Test + @Ignore // TODO: propagate cancellation before running listeners. + fun testAsListenableFuturePropagatesCancellationBeforeRunningListeners() = runTest { + expect(1) + val deferred = async(context = Dispatchers.Unconfined) { + try { + delay(Long.MAX_VALUE) + } finally { + expect(3) // Cancelled. + } + } + val asFuture = deferred.asListenableFuture() + asFuture.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor()) + assertFalse(asFuture.isDone) + expect(2) + asFuture.cancel(false) + assertTrue(asFuture.isDone) + assertTrue(asFuture.isCancelled) + assertFailsWith { deferred.await() } + finish(5) + } + @Test fun testFutureCancellation() = runTest { val future = awaitFutureWithCancel(true) @@ -333,15 +356,18 @@ class ListenableFutureTest : TestBase() { val outputCancellationException = assertFailsWith { asFuture.get() } - assertEquals(outputCancellationException.message, "Foobar") - assertTrue(outputCancellationException.cause is OutOfMemoryError) - assertEquals(outputCancellationException.cause?.message, "Foobaz") + val cause = outputCancellationException.cause + assertNotNull(cause) + assertEquals(cause.message, "Foobar") + assertTrue(cause.cause is OutOfMemoryError) + assertEquals(cause.cause?.message, "Foobaz") } @Test fun testNoFutureCancellation() = runTest { val future = awaitFutureWithCancel(false) assertFalse(future.isCancelled) + @Suppress("BlockingMethodInNonBlockingContext") assertEquals(42, future.get()) finish(4) } @@ -354,7 +380,7 @@ class ListenableFutureTest : TestBase() { assertTrue(asDeferredAsFuture.isCancelled) assertFailsWith { - val value: Int = asDeferredAsFuture.await() + asDeferredAsFuture.await() } } @@ -379,7 +405,7 @@ class ListenableFutureTest : TestBase() { assertTrue(asDeferred.isCancelled) assertFailsWith { - val value: Int = asDeferred.await() + asDeferred.await() } } @@ -433,7 +459,10 @@ class ListenableFutureTest : TestBase() { @Test fun testFutureCompletedWithNullFastPathAsDeferred() = runTest { val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) - val future = executor.submit(Callable { null }).also { it.get() } + val future = executor.submit(Callable { null }).also { + @Suppress("BlockingMethodInNonBlockingContext") + it.get() + } assertNull(future.asDeferred().await()) } @@ -494,8 +523,10 @@ class ListenableFutureTest : TestBase() { val future = future(Dispatchers.Unconfined) { try { delay(Long.MAX_VALUE) - } finally { + expectUnreached() + } catch (e: CancellationException) { expect(2) + throw e } } @@ -507,17 +538,19 @@ class ListenableFutureTest : TestBase() { @Test fun testExceptionOnExternalCancellation() = runTest(expected = {it is TestException}) { - expect(1) val result = future(Dispatchers.Unconfined) { try { + expect(1) delay(Long.MAX_VALUE) - } finally { - expect(2) + expectUnreached() + } catch (e: CancellationException) { + expect(3) throw TestException() } } + expect(2) result.cancel(true) - finish(3) + finish(4) } @Test @@ -540,12 +573,103 @@ class ListenableFutureTest : TestBase() { finish(3) } + @Test + fun testCancellingFutureContextJobCancelsFuture() = runTest { + expect(1) + val supervisorJob = SupervisorJob() + val future = future(context = supervisorJob) { + expect(2) + try { + delay(Long.MAX_VALUE) + expectUnreached() + } catch (e: CancellationException) { + expect(4) + throw e + } + } + yield() + expect(3) + supervisorJob.cancel(CancellationException("Parent cancelled", TestException())) + supervisorJob.join() + assertTrue(future.isDone) + assertTrue(future.isCancelled) + val thrown = assertFailsWith { future.get() } + val cause = thrown.cause + assertNotNull(cause) + assertTrue(cause is CancellationException) + assertEquals("Parent cancelled", cause.message) + assertTrue(cause.cause is TestException) + finish(5) + } + + @Test + fun testFutureChildException() = runTest { + val future = future(context = NonCancellable + Dispatchers.Unconfined) { + val foo = async { delay(Long.MAX_VALUE); 42 } + val bar = async { throw TestException() } + foo.await() + bar.await() + } + future.checkFutureException() + } + + @Test + fun testFutureIsDoneAfterChildrenCompleted() = runTest { + expect(1) + val testException = TestException() + // Don't propagate exception to the test and use different dispatchers as we are going to block test thread. + val future = future(context = NonCancellable + Dispatchers.Default) { + val foo = async { + try { + delay(Long.MAX_VALUE) + 42 + } finally { + withContext(NonCancellable) { + delay(200) + } + } + } + foo.invokeOnCompletion { + expect(3) + } + val bar = async { throw testException } + foo.await() + bar.await() + } + yield() + expect(2) + // Blocking get should succeed after internal coroutine completes. + val thrown = assertFailsWith { future.get() } + expect(4) + assertEquals(testException, thrown.cause) + finish(5) + } + + @Test + @Ignore // TODO: propagate cancellation before running listeners. + fun testFuturePropagatesCancellationBeforeRunningListeners() = runTest { + expect(1) + val future = future(context = Dispatchers.Unconfined) { + try { + delay(Long.MAX_VALUE) + } finally { + expect(3) // Cancelled. + } + } + future.addListener(Runnable { expect(4) }, MoreExecutors.directExecutor()) + assertFalse(future.isDone) + expect(2) + future.cancel(false) + assertTrue(future.isDone) + assertTrue(future.isCancelled) + finish(5) + } + private inline fun ListenableFuture<*>.checkFutureException() { val e = assertFailsWith { get() } val cause = e.cause!! assertTrue(cause is T) } + @Suppress("SuspendFunctionOnCoroutineScope") private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): ListenableFuture { val latch = CountDownLatch(1) val executor = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool()) From aa0cba6c190064a80bb93f3b6c08701125df8f52 Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Wed, 25 Nov 2020 01:10:52 +0000 Subject: [PATCH 2/3] Address comments from @qwwdfsad review. * Replace `Result` with `Cancelled` to save an allocation on successful completion. * Add a test to show that `CancellationException` is never passed to `CoroutineExceptionHandler`. * Explicitly specify return types. * Use @JvmField on "package-private" property to not generate an accessor. --- .../src/ListenableFuture.kt | 65 +++++++++---------- .../test/ListenableFutureTest.kt | 17 +++++ 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 99aae7cf1b..c93c197b59 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -207,7 +207,7 @@ public fun Deferred.asListenableFuture(): ListenableFuture { if (throwable == null) { listenableFuture.complete(getCompleted()) } else { - listenableFuture.completeExceptionally(throwable) + listenableFuture.completeExceptionallyOrCancel(throwable) } } return listenableFuture @@ -302,14 +302,14 @@ private class ListenableFutureCoroutine( ) : AbstractCoroutine(context) { // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture. - val future = JobListenableFuture(this) + @JvmField val future = JobListenableFuture(this) override fun onCompleted(value: T) { future.complete(value) } override fun onCancelled(cause: Throwable, handled: Boolean) { - if (!future.completeExceptionally(cause) && !handled) { + if (!future.completeExceptionallyOrCancel(cause) && !handled) { // prevents loss of exception that was not handled by parent & could not be set to JobListenableFuture handleCoroutineException(context, cause) } @@ -342,8 +342,10 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned. + * + * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled]. */ - private val auxFuture = SettableFuture.create>() + private val auxFuture = SettableFuture.create() /** * When the attached coroutine [isCompleted][Job.isCompleted] successfully @@ -351,7 +353,7 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu * * This should succeed barring a race with external cancellation. */ - fun complete(result: T) = auxFuture.set(Result.Success(result)) + fun complete(result: T): Boolean = auxFuture.set(result) /** * When the attached coroutine [isCompleted][Job.isCompleted] [exceptionally][Job.isCancelled] @@ -361,10 +363,10 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu * * This should succeed barring a race with external cancellation. */ - // CancellationException is wrapped into `Result.Cancellation` to preserve original cause and message. + // CancellationException is wrapped into `Cancelled` to preserve original cause and message. // All the other exceptions are delegated to SettableFuture.setException. - fun completeExceptionally(t: Throwable) = - if (t is CancellationException) auxFuture.set(Result.Cancellation(t)) else auxFuture.setException(t) + fun completeExceptionallyOrCancel(t: Throwable): Boolean = + if (t is CancellationException) auxFuture.set(Cancelled(t)) else auxFuture.setException(t) /** * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to @@ -383,16 +385,16 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu // this Future hasn't itself been successfully cancelled, the Future will return // isCancelled() == false. This is the only discovered way to reconcile the two different // cancellation contracts. - return auxFuture.isCancelled || (isDone && Uninterruptibles.getUninterruptibly(auxFuture).isCancelled) + return auxFuture.isCancelled || (isDone && Uninterruptibles.getUninterruptibly(auxFuture) is Cancelled) } /** - * Waits for [auxFuture] to complete by blocking, then uses its [result][Result] - * to get the `T` value `this` [ListenableFuture] is pointing to. This establishes - * happens-after ordering for completion of the entangled coroutine. + * Waits for [auxFuture] to complete by blocking, then uses its `result` + * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException]. + * This establishes happens-after ordering for completion of the entangled coroutine. * * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally. - * Otherwise it returns [Result] that encapsulates outcome of the entangled coroutine. + * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine. * * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after * contract of [Future] to be correctly followed. @@ -407,9 +409,12 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu } /** See [get()]. */ - private fun getInternal(result: Result): T = when(result) { - is Result.Success -> result.value - is Result.Cancellation -> throw CancellationException().initCause(result.exception) + private fun getInternal(result: Any): T = if (result is Cancelled) { + throw CancellationException().initCause(result.exception) + } else { + // We know that `auxFuture` can contain either `T` or `Cancelled`. + @Suppress("UNCHECKED_CAST") + result as T } override fun addListener(listener: Runnable, executor: Executor) { @@ -449,8 +454,8 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu if (isDone) { try { when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) { - is Result.Success -> append("SUCCESS, result=[${result.value}") - is Result.Cancellation -> append("CANCELLED, cause=[${result.exception}]") + is Cancelled -> append("CANCELLED, cause=[${result.exception}]") + else -> append("SUCCESS, result=[$result") } } catch (e: CancellationException) { // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message. @@ -468,22 +473,10 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu } /** - * Represents the result of a `Coroutine` in the sense of [Future]. - * - * - When coroutine [isCompleted][Job.isCompleted] successfully, its result is wrapped into [Success]. - * - When coroutine [isCompleted][Job.isCompleted] exceptionally, meaning [isCancelled] - * returns `true`. According to [Future]'s contract, the exception can be of two types: - * - [CancellationException] if the coroutine was _cancelled normally_, meaning it threw [CancellationException]. - * In this case the exception is wrapped into [Result.Cancellation]. - * - [ExecutionException] with the original [Throwable] in its [cause][ExecutionException.cause] otherwise. - * This case is handled separately. See [JobListenableFuture.completeExceptionally]. + * A wrapper for `Coroutine`'s [CancellationException]. + * + * If the coroutine is _cancelled normally_, we want to show the reason of cancellation to the user. Unfortunately, + * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this + * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture]. */ -private sealed class Result { - /** Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to [Job.isCancelled] */ - open val isCancelled get() = false - - class Success(val value: T) : Result() - class Cancellation(val exception: CancellationException) : Result() { - override val isCancelled get() = true - } -} +private class Cancelled(val exception: CancellationException) diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index a023111c9e..dc2d99d7f7 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -573,6 +573,23 @@ class ListenableFutureTest : TestBase() { finish(3) } + /** This test ensures that we never pass [CancellationException] to [CoroutineExceptionHandler]. */ + @Test + fun testCancellationExceptionOnExternalCancellation() = runTest { + expect(1) + // No parent here (NonCancellable), so nowhere to propagate exception + val result = future(NonCancellable + Dispatchers.Unconfined) { + try { + delay(Long.MAX_VALUE) + } finally { + expect(2) + throw TestCancellationException() // this exception cannot be handled + } + } + assertTrue(result.cancel(true)) + finish(3) + } + @Test fun testCancellingFutureContextJobCancelsFuture() = runTest { expect(1) From 225884c08076e096c1785e53375a6751929fbb5e Mon Sep 17 00:00:00 2001 From: Vadim Semenov Date: Wed, 25 Nov 2020 19:53:52 +0000 Subject: [PATCH 3/3] Use @JvmField on "package-private" property to not generate an accessor. Also fix indentation. --- integration/kotlinx-coroutines-guava/src/ListenableFuture.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index c93c197b59..6d1fab3d69 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -167,7 +167,7 @@ public fun ListenableFuture.asDeferred(): Deferred { * state - a serious fundamental bug. */ private fun ExecutionException.nonNullCause(): Throwable { - return this.cause!! + return this.cause!! } /** @@ -479,4 +479,4 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu * [SettableFuture] can't store the reason of cancellation. To mitigate this, we wrap cancellation exception into this * class and pass it into [SettableFuture.complete]. See implementation of [JobListenableFuture]. */ -private class Cancelled(val exception: CancellationException) +private class Cancelled(@JvmField val exception: CancellationException)