Skip to content

Commit

Permalink
Cancel dispatched coroutine on Dispatchers.IO when the underlying Han… (
Browse files Browse the repository at this point in the history
#2784)

Cancel dispatched coroutine on Dispatchers.IO when the underlying Handler is closed in Handler.asCoroutineDispatcher()

Fixes #2778
  • Loading branch information
qwwdfsad authored Jun 28, 2021
1 parent a327dfb commit 6cd1883
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 16 deletions.
38 changes: 27 additions & 11 deletions ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
package kotlinx.coroutines.android

import android.os.*
import androidx.annotation.*
import android.view.*
import androidx.annotation.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import java.lang.reflect.*
Expand Down Expand Up @@ -54,15 +54,22 @@ internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true))

override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}

/**
* Represents an arbitrary [Handler] as a implementation of [CoroutineDispatcher]
* Represents an arbitrary [Handler] as an implementation of [CoroutineDispatcher]
* with an optional [name] for nicer debugging
*
* ## Rejected execution
*
* If the underlying handler is closed and its message-scheduling methods start to return `false` on
* an attempt to submit a continuation task to the resulting dispatcher,
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*/
@JvmName("from") // this is for a nice Java API, see issue #255
@JvmOverloads
Expand Down Expand Up @@ -129,24 +136,33 @@ internal class HandlerContext private constructor(
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
if (!handler.post(block)) {
cancelOnRejection(context, block)
}
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
} else {
cancelOnRejection(continuation.context, block)
}
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
handler.removeCallbacks(block)
}
if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) {
return DisposableHandle { handler.removeCallbacks(block) }
}
cancelOnRejection(context, block)
return NonDisposableHandle
}

private fun cancelOnRejection(context: CoroutineContext, block: Runnable) {
context.cancel(CancellationException("The task was rejected, the handler underlying the dispatcher '${toString()}' was closed"))
Dispatchers.IO.dispatch(context, block)
}

override fun toString(): String = toStringInternalImpl() ?: run {
Expand Down
64 changes: 64 additions & 0 deletions ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.android

import android.os.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.runner.*
import org.robolectric.*
import org.robolectric.annotation.*

@RunWith(RobolectricTestRunner::class)
@Config(manifest = Config.NONE, sdk = [28])
class DisabledHandlerTest : TestBase() {

private var delegateToSuper = false
private val disabledDispatcher = object : Handler() {
override fun sendMessageAtTime(msg: Message?, uptimeMillis: Long): Boolean {
if (delegateToSuper) return super.sendMessageAtTime(msg, uptimeMillis)
return false
}
}.asCoroutineDispatcher()

@Test
fun testRunBlocking() {
expect(1)
try {
runBlocking(disabledDispatcher) {
expectUnreached()
}
expectUnreached()
} catch (e: CancellationException) {
finish(2)
}
}

@Test
fun testInvokeOnCancellation() = runTest {
val job = launch(disabledDispatcher, start = CoroutineStart.LAZY) { expectUnreached() }
job.invokeOnCompletion { if (it != null) expect(2) }
yield()
expect(1)
job.join()
finish(3)
}

@Test
fun testWithTimeout() = runTest {
delegateToSuper = true
try {
withContext(disabledDispatcher) {
expect(1)
delegateToSuper = false
delay(Long.MAX_VALUE - 1)
expectUnreached()
}
expectUnreached()
} catch (e: CancellationException) {
finish(2)
}
}
}
10 changes: 5 additions & 5 deletions ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class HandlerDispatcherTest : TestBase() {
fun mainIsAsync() = runTest {
ReflectionHelpers.setStaticField(Build.VERSION::class.java, "SDK_INT", 28)

val mainLooper = ShadowLooper.getShadowMainLooper()
val mainLooper = shadowOf(Looper.getMainLooper())
mainLooper.pause()
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)

Expand All @@ -48,7 +48,7 @@ class HandlerDispatcherTest : TestBase() {

val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher()

val mainLooper = ShadowLooper.getShadowMainLooper()
val mainLooper = shadowOf(Looper.getMainLooper())
mainLooper.pause()
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)

Expand All @@ -67,7 +67,7 @@ class HandlerDispatcherTest : TestBase() {

val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher()

val mainLooper = ShadowLooper.getShadowMainLooper()
val mainLooper = shadowOf(Looper.getMainLooper())
mainLooper.pause()
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)

Expand All @@ -86,7 +86,7 @@ class HandlerDispatcherTest : TestBase() {

val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher()

val mainLooper = ShadowLooper.getShadowMainLooper()
val mainLooper = shadowOf(Looper.getMainLooper())
mainLooper.pause()
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)

Expand All @@ -105,7 +105,7 @@ class HandlerDispatcherTest : TestBase() {

val main = Looper.getMainLooper().asHandler(async = false).asCoroutineDispatcher()

val mainLooper = ShadowLooper.getShadowMainLooper()
val mainLooper = shadowOf(Looper.getMainLooper())
mainLooper.pause()
val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)

Expand Down

0 comments on commit 6cd1883

Please sign in to comment.