Skip to content

Commit

Permalink
Do not report exceptions raised in CoroutineDispatcher.dispatch as in…
Browse files Browse the repository at this point in the history
…ternal errors (#4181)

Co-authored-by: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>

Fixes #4091
  • Loading branch information
zuevmaxim authored Oct 21, 2024
1 parent 1500c83 commit 46f9ccc
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ListAllCoroutineThrowableSubclassesTest {
"kotlinx.coroutines.internal.DiagnosticCoroutineContextException",
"kotlinx.coroutines.internal.ExceptionSuccessfullyProcessed",
"kotlinx.coroutines.CoroutinesInternalError",
"kotlinx.coroutines.DispatchException",
"kotlinx.coroutines.channels.ClosedSendChannelException",
"kotlinx.coroutines.channels.ClosedReceiveChannelException",
"kotlinx.coroutines.flow.internal.ChildCancelledException",
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public abstract class CoroutineDispatcher :
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = safeDispatch(context, block)

/**
* Returns a continuation that wraps the provided [continuation], thus intercepting all resumptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ import kotlin.coroutines.*
*/
@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
val reportException = if (exception is DispatchException) exception.cause else exception
// Invoke an exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
it.handleException(context, reportException)
return
}
} catch (t: Throwable) {
handleUncaughtCoroutineException(context, handlerException(exception, t))
handleUncaughtCoroutineException(context, handlerException(reportException, t))
return
}
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
handleUncaughtCoroutineException(context, exception)
handleUncaughtCoroutineException(context, reportException)
}

internal fun handlerException(originalException: Throwable, thrownException: Throwable): Throwable {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Yield.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u
val context = uCont.context
context.ensureActive()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (cont.dispatcher.isDispatchNeeded(context)) {
if (cont.dispatcher.safeIsDispatchNeeded(context)) {
// this is a regular dispatcher -- do simple dispatchYield
cont.dispatchYield(context, Unit)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,10 @@ internal class DispatchedContinuation<in T>(

override fun resumeWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
if (dispatcher.safeIsDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
dispatcher.safeDispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(context, countOrElement) {
Expand All @@ -205,10 +205,10 @@ internal class DispatchedContinuation<in T>(
@Suppress("NOTHING_TO_INLINE")
internal inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
if (dispatcher.safeIsDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
dispatcher.safeDispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
Expand Down Expand Up @@ -249,6 +249,22 @@ internal class DispatchedContinuation<in T>(
"DispatchedContinuation[$dispatcher, ${continuation.toDebugString()}]"
}

internal fun CoroutineDispatcher.safeDispatch(context: CoroutineContext, runnable: Runnable) {
try {
dispatch(context, runnable)
} catch (e: Throwable) {
throw DispatchException(e, this, context)
}
}

internal fun CoroutineDispatcher.safeIsDispatchNeeded(context: CoroutineContext): Boolean {
try {
return isDispatchNeeded(context)
} catch (e: Throwable) {
throw DispatchException(e, this, context)
}
}

/**
* It is not inline to save bytecode (it is pretty big and used in many places)
* and we leave it public so that its name is not mangled in use stack traces if it shows there.
Expand Down
26 changes: 19 additions & 7 deletions kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ internal abstract class DispatchedTask<in T> internal constructor(

final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
Expand All @@ -102,11 +101,10 @@ internal abstract class DispatchedTask<in T> internal constructor(
}
}
}
} catch (e: DispatchException) {
handleCoroutineException(delegate.context, e.cause)
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
fatalException?.let { handleFatalException(it) }
handleFatalException(e)
}
}

Expand Down Expand Up @@ -143,8 +141,8 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
if (dispatcher.safeIsDispatchNeeded(context)) {
dispatcher.safeDispatch(context, this)
} else {
resumeUnconfined()
}
Expand Down Expand Up @@ -205,3 +203,17 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}

/**
* This exception holds an exception raised in [CoroutineDispatcher.dispatch] method.
* When dispatcher methods fail unexpectedly, it is likely a user-induced programmatic bug,
* such as calling `executor.close()` prematurely. To avoid reporting such exceptions as fatal errors,
* we handle them with a separate code path. See also #4091.
*
* @see safeDispatch
*/
internal class DispatchException(
override val cause: Throwable,
dispatcher: CoroutineDispatcher,
context: CoroutineContext,
) : Exception("Coroutine dispatcher $dispatcher threw an exception, context = $context", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ internal class LimitedDispatcher(

override fun dispatch(context: CoroutineContext, block: Runnable) {
dispatchInternal(block) { worker ->
dispatcher.dispatch(this, worker)
dispatcher.safeDispatch(this, worker)
}
}

Expand Down Expand Up @@ -116,10 +116,10 @@ internal class LimitedDispatcher(
}
currentTask = obtainTaskOrDeallocateWorker() ?: return
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
// Do "yield" to let other views execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
dispatcher.dispatch(this@LimitedDispatcher, this)
dispatcher.safeDispatch(this@LimitedDispatcher, this)
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private fun dispatcherFailure(completion: Continuation<*>, e: Throwable) {
* 2) Rethrow the exception immediately, so it will crash the caller (e.g. when the coroutine had
* no parent or it was async/produce over MainScope).
*/
completion.resumeWith(Result.failure(e))
throw e
val reportException = if (e is DispatchException) e.cause else e
completion.resumeWith(Result.failure(reportException))
throw reportException
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ internal fun <R, T> (suspend (R) -> T).startCoroutineUndispatched(receiver: R, c
startCoroutineUninterceptedOrReturn(receiver, actualCompletion)
}
} catch (e: Throwable) {
actualCompletion.resumeWithException(e)
val reportException = if (e is DispatchException) e.cause else e
actualCompletion.resumeWithException(reportException)
return
}
if (value !== COROUTINE_SUSPENDED) {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ public fun CoroutineDispatcher.asExecutor(): Executor =

private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
override fun execute(block: Runnable) {
if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) {
dispatcher.dispatch(EmptyCoroutineContext, block)
if (dispatcher.safeIsDispatchNeeded(EmptyCoroutineContext)) {
dispatcher.safeDispatch(EmptyCoroutineContext, block)
} else {
block.run()
}
Expand Down
102 changes: 102 additions & 0 deletions kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,106 @@ class ExecutorsTest : TestBase() {
dispatcher.close()
check(executorService.isShutdown)
}

@Test
fun testEarlyExecutorShutdown() {
runTestExceptionInDispatch(6, { it is RejectedExecutionException }) {
expect(1)
val dispatcher = newSingleThreadContext("Ctx")
launch(dispatcher) {
withContext(Dispatchers.Default) {
expect(2)
delay(100)
expect(4)
}
}

delay(50)
expect(3)

dispatcher.close()
}
}

@Test
fun testExceptionInDispatch() {
runTestExceptionInDispatch(5, { it is TestException }) {
val dispatcher = object : CoroutineDispatcher() {
private var closed = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (closed) throw TestException()
Dispatchers.Default.dispatch(context, block)
}

fun close() {
closed = true
}
}
launch(dispatcher) {
withContext(Dispatchers.Default) {
expect(1)
delay(100)
expect(3)
}
}

delay(50)
expect(2)
dispatcher.close()
}
}

@Test
fun testExceptionInIsDispatchNeeded() {
val dispatcher = object : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
expect(2)
throw TestException()
}
override fun dispatch(context: CoroutineContext, block: Runnable) = expectUnreached()
}
try {
runBlocking {
expect(1)
try {
launch(dispatcher) {
expectUnreached()
}
expectUnreached()
} catch (_: TestException) {
expect(3)
}

}
} catch (_: TestException) {
finish(4)
}
}

private fun runTestExceptionInDispatch(
totalSteps: Int,
isExpectedException: (Throwable) -> Boolean,
block: suspend CoroutineScope.() -> Unit,
) {
var mainThread: Thread? = null
val exceptionHandler = CoroutineExceptionHandler { _, e ->
if (isExpectedException(e)) {
expect(totalSteps - 1)
mainThread!!.run {
interrupt()
unpark(this)
}
} else {
expectUnreached()
}
}
try {
runBlocking(exceptionHandler) {
block()
mainThread = Thread.currentThread()
}
} catch (_: InterruptedException) {
finish(totalSteps)
}
}
}

0 comments on commit 46f9ccc

Please sign in to comment.