Skip to content

Commit

Permalink
Setup cancellation eagerly in suspendCancellableCoroutine to properly…
Browse files Browse the repository at this point in the history
… integrate with APIs that may block the current thread, but react on cancellation

Fixes #1671
  • Loading branch information
qwwdfsad committed Nov 28, 2019
1 parent 7e895fc commit d680cba
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public final fun setupCancellation ()V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
Expand Down
16 changes: 11 additions & 5 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0
@ExperimentalCoroutinesApi // since 1.2.0
public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
}

Expand All @@ -199,6 +199,12 @@ public suspend inline fun <T> suspendCancellableCoroutine(
// NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
// method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
// cancellable.initCancellability()
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.setupCancellation()
block(cancellable)
cancellable.getResult()
}
Expand Down Expand Up @@ -229,10 +235,10 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
block(cancellable)
cancellable.getResult()
}
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ internal open class CancellableContinuationImpl<in T>(
/*
* Implementation notes
*
* AbstractContinuation is a subset of Job with following limitations:
* 1) It can have only cancellation listeners
* CancellableContinuationImpl is a subset of Job with following limitations:
* 1) It can have only cancellation listener (no "on cancelling")
* 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
* 3) It can have at most one cancellation listener
* 4) Its cancellation listeners cannot be deregistered
Expand Down Expand Up @@ -105,9 +105,11 @@ internal open class CancellableContinuationImpl<in T>(

/**
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
* It is only invoked from an internal [getResult] function.
* It is only invoked from an internal [getResult] function for reusable continuations
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
*/
private fun setupCancellation() {
@PublishedApi
internal fun setupCancellation() {
if (checkCompleted()) return
if (parentHandle !== null) return // fast path 2 -- was already initialized
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
Expand Down Expand Up @@ -451,4 +453,3 @@ private class CompletedWithCancellation(
) {
override fun toString(): String = "CompletedWithCancellation[$result]"
}

21 changes: 20 additions & 1 deletion kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class CancellableResumeTest : TestBase() {
expected = { it is TestException }
) {
expect(1)
val ok = suspendCancellableCoroutine<String> { cont ->
suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
cont.cancel(TestException("FAIL"))
Expand All @@ -44,6 +44,25 @@ class CancellableResumeTest : TestBase() {
expectUnreached()
}

@Test
fun testResumeImmediateAfterIndirectCancel() = runTest(
expected = { it is CancellationException }
) {
expect(1)
val ctx = coroutineContext
suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
ctx.cancel()
expect(4)
cont.resume("OK") { cause ->
expect(5)
}
finish(6)
}
expectUnreached()
}

@Test
fun testResumeLaterNormally() = runTest {
expect(1)
Expand Down
59 changes: 59 additions & 0 deletions kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,63 @@ class CancellableContinuationJvmTest : TestBase() {
}
suspend {}() // Eliminate tail-call optimization
}

@Test
fun testExceptionIsNotReported() = runTest({ it is CancellationException }) {
val ctx = coroutineContext
suspendCancellableCoroutine<Unit> {
ctx.cancel()
it.resumeWith(Result.failure(TestException()))
}
}

@Test
fun testBlockingIntegration() = runTest {
val source = BlockingSource()
val job = launch(Dispatchers.Default) {
source.await()
}
source.cancelAndJoin(job)
}

@Test
fun testBlockingIntegrationAlreadyCancelled() = runTest {
val source = BlockingSource()
val job = launch(Dispatchers.Default) {
cancel()
source.await()
}
source.cancelAndJoin(job)
}

private suspend fun BlockingSource.cancelAndJoin(job: Job) {
while (!hasSubscriber) {
Thread.sleep(10)
}
job.cancelAndJoin()
}

private suspend fun BlockingSource.await() = suspendCancellableCoroutine<Unit> {
it.invokeOnCancellation { this.cancel() }
subscribe()
}

private class BlockingSource {
@Volatile
private var isCancelled = false

@Volatile
public var hasSubscriber = false

public fun subscribe() {
hasSubscriber = true
while (!isCancelled) {
Thread.sleep(10)
}
}

public fun cancel() {
isCancelled = true
}
}
}

0 comments on commit d680cba

Please sign in to comment.