Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype different cancellation approach for JobListenableFuture. #2

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 48 additions & 42 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Original file line number Diff line number Diff line change
Expand Up @@ -335,17 +335,15 @@ private class ListenableFutureCoroutine<T>(
* could probably be compressed into one subclass of [AbstractFuture] to save an allocation, at the
* cost of the implementation's readability.
*/
private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFuture<T> {
private class JobListenableFuture<T>(private val job: Job): ListenableFuture<T> {
/**
* Serves as a state machine for [Future] cancellation.
*
* [AbstractFuture] has a highly-correct atomic implementation of `Future`'s completion and
* cancellation semantics. By using that type, the [JobListenableFuture] can delegate its semantics to
* `auxFuture.get()` the result in such a way that the `Deferred` is always complete when returned.
*
* To preserve Coroutine's [CancellationException], this future points to either `T` or [Cancelled].
*/
private val auxFuture = SettableFuture.create<Any>()
private val auxFuture = SettableFuture.create<T>()

/**
* When the attached coroutine [isCompleted][Job.isCompleted] successfully
Expand All @@ -363,10 +361,8 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
*
* This should succeed barring a race with external cancellation.
*/
// CancellationException is wrapped into `Cancelled` to preserve original cause and message.
// All the other exceptions are delegated to SettableFuture.setException.
fun completeExceptionallyOrCancel(t: Throwable): Boolean =
if (t is CancellationException) auxFuture.set(Cancelled(t)) else auxFuture.setException(t)
if (t is CancellationException) auxFuture.cancel(false) else auxFuture.setException(t)

/**
* Returns cancellation _in the sense of [Future]_. This is _not_ equivalent to
Expand All @@ -385,51 +381,55 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
// this Future hasn't itself been successfully cancelled, the Future will return
// isCancelled() == false. This is the only discovered way to reconcile the two different
// cancellation contracts.
return auxFuture.isCancelled || auxFuture.completedWithCancellation
return auxFuture.isCancelled
}

/**
* Helper for [isCancelled] that takes into account that
* our auxiliary future can complete with [Cancelled] instance.
*/
private val SettableFuture<*>.completedWithCancellation: Boolean
get() = isDone && try {
Uninterruptibles.getUninterruptibly(this) is Cancelled
} catch (e: CancellationException) {
true
} catch (t: Throwable) {
// In theory appart from CancellationException, getUninterruptibly can only
// throw ExecutionException, but to be safe we catch Throwable here.
false
}

/**
* Waits for [auxFuture] to complete by blocking, then uses its `result`
* to get the `T` value `this` [ListenableFuture] is pointing to or throw a [CancellationException].
* This establishes happens-after ordering for completion of the entangled coroutine.
*
* [SettableFuture.get] can only throw [CancellationException] if it was cancelled externally.
* Otherwise it returns [Cancelled] that encapsulates outcome of the entangled coroutine.
* Otherwise we do the best effort to restore original [CancellationException] from the [job].
*
* [auxFuture] _must be complete_ in order for the [isDone] and [isCancelled] happens-after
* contract of [Future] to be correctly followed.
*/
override fun get(): T {
return getInternal(auxFuture.get())
return getImpl { auxFuture.get() }
}

/** See [get()]. */
override fun get(timeout: Long, unit: TimeUnit): T {
return getInternal(auxFuture.get(timeout, unit))
return getImpl { auxFuture.get(timeout, unit) }
}

/** See [get()]. */
private fun getInternal(result: Any): T = if (result is Cancelled) {
throw CancellationException().initCause(result.exception)
} else {
// We know that `auxFuture` can contain either `T` or `Cancelled`.
@Suppress("UNCHECKED_CAST")
result as T
private inline fun getImpl(blockingGet: () -> T): T {
return try {
blockingGet()
} catch (e: CancellationException) {
// Set cause to original exception if possible.
val original = originalCancellationException()
if (original != null) {
throw CancellationException().initCause(original)
}
// Rethrow CancellationException without cause and message.
throw e
}
}

/**
* Distinguishes [FutureExternalCancellationException] from all the other [CancellationException]s.
*/
private fun originalCancellationException(): CancellationException? {
if (!job.isActive && job.isCancelled) {
// TODO(vadimsemenov): ensure that it's correct usage of this API.
val cancellationException = job.getCancellationException()
if (cancellationException !is FutureExternalCancellationException) {
return cancellationException
}
}
return null
}

override fun addListener(listener: Runnable, executor: Executor) {
Expand All @@ -441,13 +441,13 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
}

/**
* Tries to cancel [jobToCancel] if `this` future was cancelled. This is fundamentally racy.
* Tries to cancel [job] if `this` future was cancelled. This is fundamentally racy.
*
* The call to `cancel()` will try to cancel [auxFuture]: if and only if cancellation of [auxFuture]
* succeeds, [jobToCancel] will have its [Job.cancel] called.
* succeeds, [job] will have its [Job.cancel] called.
*
* This arrangement means that [jobToCancel] _might not successfully cancel_, if the race resolves
* in a particular way. [jobToCancel] may also be in its "cancelling" state while this
* This arrangement means that [job] _might not successfully cancel_, if the race resolves
* in a particular way. [job] may also be in its "cancelling" state while this
* ListenableFuture is complete and cancelled.
*/
override fun cancel(mayInterruptIfRunning: Boolean): Boolean {
Expand All @@ -456,7 +456,7 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
// `jobToCancel` until after auxFuture's listeners have already run.
// Consider moving `jobToCancel.cancel()` into [AbstractFuture.afterDone] when the API is finalized.
return if (auxFuture.cancel(mayInterruptIfRunning)) {
jobToCancel.cancel()
job.cancel(FutureExternalCancellationException())
true
} else {
false
Expand All @@ -468,13 +468,15 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
append("[status=")
if (isDone) {
try {
when (val result = Uninterruptibles.getUninterruptibly(auxFuture)) {
is Cancelled -> append("CANCELLED, cause=[${result.exception}]")
else -> append("SUCCESS, result=[$result")
}
val result = Uninterruptibles.getUninterruptibly(auxFuture)
append("SUCCESS, result=[$result]")
} catch (e: CancellationException) {
// `this` future was cancelled by `Future.cancel`. In this case there's no cause or message.
append("CANCELLED")
val originalCancellationException = originalCancellationException()
if (originalCancellationException != null) {
append(", cause=[$originalCancellationException]")
}
} catch (e: ExecutionException) {
append("FAILURE, cause=[${e.cause}]")
} catch (t: Throwable) {
Expand All @@ -484,9 +486,13 @@ private class JobListenableFuture<T>(private val jobToCancel: Job): ListenableFu
} else {
append("PENDING, delegate=[$auxFuture]")
}
append("]")
}
}

/** Internal type that is used to cancel [Job] if returned [ListenableFuture] was cancelled first. */
private class FutureExternalCancellationException : CancellationException("Resulting ListenableFuture was cancelled")

/**
* A wrapper for `Coroutine`'s [CancellationException].
*
Expand Down