diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 9590dc0724..68ed523f95 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -335,17 +335,15 @@ private class ListenableFutureCoroutine( * could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the * cost of the implementation's readability. */ -private class JobListenableFuture(private val jobToCancel: Job): ListenableFuture { +private class JobListenableFuture(private val job: Job): ListenableFuture { /** * Serves as a state machine for [Future] cancellation. * * [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and * cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to * `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned. - * - * To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled]. */ - private val auxFuture = SettableFuture.create() + private val auxFuture = SettableFuture.create() /** * When the attached coroutine [isCompleted][Job.isCompleted] successfully @@ -363,10 +361,8 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu * * This should succeed barring a race with external cancellation. */ - // CancellationException is wrapped into `Cancelled` to preserve original cause and message. - // All the other exceptions are delegated to SettableFuture.setException. fun completeExceptionallyOrCancel(t: Throwable): Boolean = - if (t is CancellationException) auxFuture.set(Cancelled(t)) else auxFuture.setException(t) + if (t is CancellationException) auxFuture.cancel(false) else auxFuture.setException(t) /** * Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to @@ -385,51 +381,55 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu // this Future hasn't itself been successfully cancelled, the Future will return // isCancelled() == false. This is the only discovered way to reconcile the two different // cancellation contracts. - return auxFuture.isCancelled || auxFuture.completedWithCancellation + return auxFuture.isCancelled } - /** - * Helper for [isCancelled] that takes into account that - * our auxiliary future can complete with [Cancelled] instance. - */ - private val SettableFuture<*>.completedWithCancellation: Boolean - get() = isDone && try { - Uninterruptibles.getUninterruptibly(this) is Cancelled - } catch (e: CancellationException) { - true - } catch (t: Throwable) { - // In theory appart from CancellationException, getUninterruptibly can only - // throw ExecutionException, but to be safe we catch Throwable here. - false - } - /** * Waits for [auxFuture] to complete by blocking, then uses its `result` * to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException]. * This establishes happens-after ordering for completion of the entangled coroutine. * * [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally. - * Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine. + * Otherwise we do the best effort to restore original [CancellationException] from the [job]. * * [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after * contract of [Future] to be correctly followed. */ override fun get(): T { - return getInternal(auxFuture.get()) + return getImpl { auxFuture.get() } } /** See [get()]. */ override fun get(timeout: Long, unit: TimeUnit): T { - return getInternal(auxFuture.get(timeout, unit)) + return getImpl { auxFuture.get(timeout, unit) } } - /** See [get()]. */ - private fun getInternal(result: Any): T = if (result is Cancelled) { - throw CancellationException().initCause(result.exception) - } else { - // We know that `auxFuture` can contain either `T` or `Cancelled`. - @Suppress("UNCHECKED_CAST") - result as T + private inline fun getImpl(blockingGet: () -> T): T { + return try { + blockingGet() + } catch (e: CancellationException) { + // Set cause to original exception if possible. + val original = originalCancellationException() + if (original != null) { + throw CancellationException().initCause(original) + } + // Rethrow CancellationException without cause and message. + throw e + } + } + + /** + * Distinguishes [FutureExternalCancellationException] from all the other [CancellationException]s. + */ + private fun originalCancellationException(): CancellationException? { + if (!job.isActive && job.isCancelled) { + // TODO(vadimsemenov): ensure that it's correct usage of this API. + val cancellationException = job.getCancellationException() + if (cancellationException !is FutureExternalCancellationException) { + return cancellationException + } + } + return null } override fun addListener(listener: Runnable, executor: Executor) { @@ -441,13 +441,13 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu } /** - * Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy. + * Tries to cancel [job] if `this` future was cancelled. This is fundamentally racy. * * The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture] - * succeeds, [jobToCancel] will have its [Job.cancel] called. + * succeeds, [job] will have its [Job.cancel] called. * - * This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves - * in a particular way. [jobToCancel] may also be in its "cancelling" state while this + * This arrangement means that [job] _might not successfully cancel_, if the race resolves + * in a particular way. [job] may also be in its "cancelling" state while this * ListenableFuture is complete and cancelled. */ override fun cancel(mayInterruptIfRunning: Boolean): Boolean { @@ -456,7 +456,7 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu // `jobToCancel` until after auxFuture's listeners have already run. // Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized. return if (auxFuture.cancel(mayInterruptIfRunning)) { - jobToCancel.cancel() + job.cancel(FutureExternalCancellationException()) true } else { false @@ -468,13 +468,15 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu append("[status=") if (isDone) { try { - when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) { - is Cancelled -> append("CANCELLED, cause=[${result.exception}]") - else -> append("SUCCESS, result=[$result") - } + val result = Uninterruptibles.getUninterruptibly(auxFuture) + append("SUCCESS, result=[$result]") } catch (e: CancellationException) { // `this` future was cancelled by `Future.cancel`. In this case there's no cause or message. append("CANCELLED") + val originalCancellationException = originalCancellationException() + if (originalCancellationException != null) { + append(", cause=[$originalCancellationException]") + } } catch (e: ExecutionException) { append("FAILURE, cause=[${e.cause}]") } catch (t: Throwable) { @@ -484,9 +486,13 @@ private class JobListenableFuture(private val jobToCancel: Job): ListenableFu } else { append("PENDING, delegate=[$auxFuture]") } + append("]") } } +/** Internal type that is used to cancel [Job] if returned [ListenableFuture] was cancelled first. */ +private class FutureExternalCancellationException : CancellationException("Resulting ListenableFuture was cancelled") + /** * A wrapper for `Coroutine`'s [CancellationException]. *