diff --git a/kotlinx-coroutines-core/jvm/src/Interruptible.kt b/kotlinx-coroutines-core/jvm/src/Interruptible.kt index a4144d87d6..f50e0936d5 100644 --- a/kotlinx-coroutines-core/jvm/src/Interruptible.kt +++ b/kotlinx-coroutines-core/jvm/src/Interruptible.kt @@ -11,27 +11,22 @@ import kotlin.coroutines.* * Calls the specified [block] with a given coroutine context in a interruptible manner. * The blocking code block will be interrupted and this function will throw [CancellationException] * if the coroutine is cancelled. - * The specified [coroutineContext] directly translates into [withContext] argument. * * Example: + * * ``` - * val blockingJob = launch { - * // This function will throw CancellationException - * runInterruptible(Dispatchers.IO) { - * // This blocking procedure will be interrupted when this coroutine is canceled - * doSomethingElseUsefulInterruptible() + * withTimeout(500L) { // Cancels coroutine on timeout + * runInterruptible { // Throws CancellationException if interrupted + * doSomethingBlocking() // Interrupted on coroutines cancellation * } * } - * - * delay(500L) - * blockingJob.cancel() // Interrupt blocking call - * } * ``` * - * There is also an optional context parameter to this function to enable single-call conversion of - * interruptible Java methods into suspending functions like this: + * There is an optional [context] parameter to this function working just like [withContext]. + * It enables single-call conversion of interruptible Java methods into suspending functions. + * With one call here we are moving the call to [Dispatchers.IO] and supporting interruption: + * * ``` - * // With one call here we are moving the call to Dispatchers.IO and supporting interruption. * suspend fun BlockingQueue.awaitTake(): T = * runInterruptible(Dispatchers.IO) { queue.take() } * ``` @@ -40,14 +35,14 @@ public suspend fun runInterruptible( context: CoroutineContext = EmptyCoroutineContext, block: () -> T ): T = withContext(context) { - runInterruptibleInExpectedContext(block) + runInterruptibleInExpectedContext(coroutineContext, block) } -private suspend fun runInterruptibleInExpectedContext(block: () -> T): T { +private fun runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T { try { - // No job -> no cancellation - val job = coroutineContext[Job] ?: return block() + val job = coroutineContext[Job]!! // withContext always creates a job val threadState = ThreadState(job) + threadState.setup() try { return block() } finally { @@ -63,7 +58,7 @@ private const val FINISHED = 1 private const val INTERRUPTING = 2 private const val INTERRUPTED = 3 -private class ThreadState : CompletionHandler { +private class ThreadState(private val job: Job) : CompletionHandler { /* === States === @@ -90,28 +85,25 @@ private class ThreadState : CompletionHandler { | | V V +---------------+ +-------------------------+ - | INTERRUPTED | | FINISHED | + | INTERRUPTED | | FINISHED | +---------------+ +-------------------------+ */ - private val state: AtomicRef = atomic(State(WORKING, null)) + private val _state = atomic(WORKING) private val targetThread = Thread.currentThread() - private data class State(@JvmField val state: Int, @JvmField val cancelHandle: DisposableHandle?) + // Registered cancellation handler + private var cancelHandle: DisposableHandle? = null - // We're using a non-primary constructor instead of init block of a primary constructor here, because - // we need to `return`. - constructor(job: Job) { - // Register cancellation handler - val cancelHandle = - job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this) + fun setup() { + cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this) // Either we successfully stored it or it was immediately cancelled - state.loop { s -> - when (s.state) { + _state.loop { state -> + when (state) { // Happy-path, move forward - WORKING -> if (state.compareAndSet(s, State(WORKING, cancelHandle))) return + WORKING -> if (_state.compareAndSet(state, WORKING)) return // Immediately cancelled, just continue INTERRUPTING, INTERRUPTED -> return - else -> throw IllegalStateException("Illegal state $s") + else -> invalidState(state) } } } @@ -120,10 +112,10 @@ private class ThreadState : CompletionHandler { /* * Do not allow to untriggered interrupt to leak */ - state.loop { s -> - when (s.state) { - WORKING -> if (state.compareAndSet(s, State(FINISHED, null))) { - s.cancelHandle?.dispose() + _state.loop { state -> + when (state) { + WORKING -> if (_state.compareAndSet(state, FINISHED)) { + cancelHandle?.dispose() return } INTERRUPTING -> { @@ -134,31 +126,32 @@ private class ThreadState : CompletionHandler { } INTERRUPTED -> { // Clear it and bail out - Thread.interrupted(); + Thread.interrupted() return } - else -> error("Illegal state: $s") + else -> invalidState(state) } } } // Cancellation handler override fun invoke(cause: Throwable?) { - state.loop { s -> - when (s.state) { + _state.loop { state -> + when (state) { // Working -> try to transite state and interrupt the thread WORKING -> { - if (state.compareAndSet(s, State(INTERRUPTING, null))) { + if (_state.compareAndSet(state, INTERRUPTING)) { targetThread.interrupt() - state.value = State(INTERRUPTED, null) + _state.value = INTERRUPTED return } } - // Finished -- runInterruptible is already complete - FINISHED -> return - INTERRUPTING, INTERRUPTED -> return - else -> error("Illegal state: $s") + // Finished -- runInterruptible is already complete, INTERRUPTING - ignore + FINISHED, INTERRUPTING, INTERRUPTED -> return + else -> invalidState(state) } } } + + private fun invalidState(state: Int): Nothing = error("Illegal state $state") }