Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve docs & code style in runInterruptibe #1994

Merged
merged 1 commit into from
May 7, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 38 additions & 45 deletions kotlinx-coroutines-core/jvm/src/Interruptible.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,22 @@ import kotlin.coroutines.*
* Calls the specified [block] with a given coroutine context in a interruptible manner.
* The blocking code block will be interrupted and this function will throw [CancellationException]
* if the coroutine is cancelled.
* The specified [coroutineContext] directly translates into [withContext] argument.
*
* Example:
*
* ```
* val blockingJob = launch {
* // This function will throw CancellationException
* runInterruptible(Dispatchers.IO) {
* // This blocking procedure will be interrupted when this coroutine is canceled
* doSomethingElseUsefulInterruptible()
* withTimeout(500L) { // Cancels coroutine on timeout
* runInterruptible { // Throws CancellationException if interrupted
* doSomethingBlocking() // Interrupted on coroutines cancellation
* }
* }
*
* delay(500L)
* blockingJob.cancel() // Interrupt blocking call
* }
* ```
*
* There is also an optional context parameter to this function to enable single-call conversion of
* interruptible Java methods into suspending functions like this:
* There is an optional [context] parameter to this function working just like [withContext].
* It enables single-call conversion of interruptible Java methods into suspending functions.
* With one call here we are moving the call to [Dispatchers.IO] and supporting interruption:
*
* ```
* // With one call here we are moving the call to Dispatchers.IO and supporting interruption.
* suspend fun <T> BlockingQueue<T>.awaitTake(): T =
* runInterruptible(Dispatchers.IO) { queue.take() }
* ```
Expand All @@ -40,14 +35,14 @@ public suspend fun <T> runInterruptible(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> T
): T = withContext(context) {
runInterruptibleInExpectedContext(block)
runInterruptibleInExpectedContext(coroutineContext, block)
}

private suspend fun <T> runInterruptibleInExpectedContext(block: () -> T): T {
private fun <T> runInterruptibleInExpectedContext(coroutineContext: CoroutineContext, block: () -> T): T {
try {
// No job -> no cancellation
val job = coroutineContext[Job] ?: return block()
val job = coroutineContext[Job]!! // withContext always creates a job
val threadState = ThreadState(job)
threadState.setup()
try {
return block()
} finally {
Expand All @@ -63,7 +58,7 @@ private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3

private class ThreadState : CompletionHandler {
private class ThreadState(private val job: Job) : CompletionHandler {
/*
=== States ===

Expand All @@ -90,28 +85,25 @@ private class ThreadState : CompletionHandler {
| |
V V
+---------------+ +-------------------------+
| INTERRUPTED | | FINISHED |
| INTERRUPTED | | FINISHED |
+---------------+ +-------------------------+
*/
private val state: AtomicRef<State> = atomic(State(WORKING, null))
private val _state = atomic(WORKING)
private val targetThread = Thread.currentThread()

private data class State(@JvmField val state: Int, @JvmField val cancelHandle: DisposableHandle?)
// Registered cancellation handler
private var cancelHandle: DisposableHandle? = null

// We're using a non-primary constructor instead of init block of a primary constructor here, because
// we need to `return`.
constructor(job: Job) {
// Register cancellation handler
val cancelHandle =
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
fun setup() {
cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
// Either we successfully stored it or it was immediately cancelled
state.loop { s ->
when (s.state) {
_state.loop { state ->
when (state) {
// Happy-path, move forward
WORKING -> if (state.compareAndSet(s, State(WORKING, cancelHandle))) return
WORKING -> if (_state.compareAndSet(state, WORKING)) return
// Immediately cancelled, just continue
INTERRUPTING, INTERRUPTED -> return
else -> throw IllegalStateException("Illegal state $s")
else -> invalidState(state)
}
}
}
Expand All @@ -120,10 +112,10 @@ private class ThreadState : CompletionHandler {
/*
* Do not allow to untriggered interrupt to leak
*/
state.loop { s ->
when (s.state) {
WORKING -> if (state.compareAndSet(s, State(FINISHED, null))) {
s.cancelHandle?.dispose()
_state.loop { state ->
when (state) {
WORKING -> if (_state.compareAndSet(state, FINISHED)) {
cancelHandle?.dispose()
return
}
INTERRUPTING -> {
Expand All @@ -134,31 +126,32 @@ private class ThreadState : CompletionHandler {
}
INTERRUPTED -> {
// Clear it and bail out
Thread.interrupted();
Thread.interrupted()
return
}
else -> error("Illegal state: $s")
else -> invalidState(state)
}
}
}

// Cancellation handler
override fun invoke(cause: Throwable?) {
state.loop { s ->
when (s.state) {
_state.loop { state ->
when (state) {
// Working -> try to transite state and interrupt the thread
WORKING -> {
if (state.compareAndSet(s, State(INTERRUPTING, null))) {
if (_state.compareAndSet(state, INTERRUPTING)) {
targetThread.interrupt()
state.value = State(INTERRUPTED, null)
_state.value = INTERRUPTED
return
}
}
// Finished -- runInterruptible is already complete
FINISHED -> return
INTERRUPTING, INTERRUPTED -> return
else -> error("Illegal state: $s")
// Finished -- runInterruptible is already complete, INTERRUPTING - ignore
FINISHED, INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}

private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}