Skip to content

Commit

Permalink
Document and tweak the contract of Executor.asCoroutineDispatcher and…
Browse files Browse the repository at this point in the history
… ExecutorService.asCoroutineDispatcher

    * Document it properly
    * Make it more robust to signature changes and/or delegation (e.g. see the implementation of java.util.concurrent.Executors.newScheduledThreadPool)
    * Give a public way to reduce the memory pressure via ScheduledFuture.cancel

Fixes #2601
  • Loading branch information
qwwdfsad committed May 24, 2021
1 parent 96e603a commit 5ba249f
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 18 deletions.
68 changes: 50 additions & 18 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
Expand Down Expand Up @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -77,7 +111,14 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)

internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = removeFutureOnCancel(executor)
/*
* Attempts to reflectively (to be Java 6 compatible) invoke
* ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
* internal scheduler queue on cancellation.
*/
init {
removeFutureOnCancel(executor)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
Expand All @@ -89,17 +130,12 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
}
}

/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
} else {
null
}
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
Expand All @@ -110,20 +146,16 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, context, timeMillis)
} else {
null
}
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}

private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
Expand Down
44 changes: 44 additions & 0 deletions kotlinx-coroutines-core/jvm/test/DelayAsCoroutineDispatcherTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.Test
import java.lang.Runnable
import java.util.concurrent.*
import kotlin.test.*

class DelayAsCoroutineDispatcherTest : TestBase() {

private var callsToSchedule = 0

private inner class STPE : ScheduledThreadPoolExecutor(1) {
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
if (delay != 0L) ++callsToSchedule
return super.schedule(command, delay, unit)
}
}

private inner class SES : ScheduledExecutorService by STPE()

@Test
fun testScheduledThreadPool() = runTest {
val executor = STPE()
withContext(executor.asCoroutineDispatcher()) {
delay(100)
}
executor.shutdown()
assertEquals(1, callsToSchedule)
}

@Test
fun testScheduledExecutorService() = runTest {
val executor = SES()
withContext(executor.asCoroutineDispatcher()) {
delay(100)
}
executor.shutdown()
assertEquals(1, callsToSchedule)
}
}

0 comments on commit 5ba249f

Please sign in to comment.