From 435844a16a9305f18967a36d0a6685d7ce427fdf Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 11 May 2023 10:53:13 +0200 Subject: [PATCH] Get rid of BeforeResumeCancelHandler (#3744) * Remove effectively duplicating code * BufferedChannelIterator is properly registered for cancellation as a corresponding segment * Undelivered element handlers are registered via extended segment's protocol - Index now encodes information whether segment corresponds to a sender or to a receiver Fixes #3646 --- .../common/src/CancellableContinuationImpl.kt | 24 +-- .../common/src/channels/BufferedChannel.kt | 151 +++++------------- .../src/internal/ConcurrentLinkedList.kt | 9 +- .../common/src/selects/Select.kt | 32 ++-- .../common/src/selects/SelectUnbiased.kt | 2 +- .../common/src/sync/Semaphore.kt | 3 +- .../CancellableContinuationHandlersTest.kt | 2 +- 7 files changed, 77 insertions(+), 146 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 098369e5ab..ed2d9f2026 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -251,7 +251,7 @@ internal open class CancellableContinuationImpl( private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) { val index = _decisionAndIndex.value.index check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" } - callCancelHandlerSafely { segment.onCancellation(index, cause) } + callCancelHandlerSafely { segment.onCancellation(index, cause, context) } } fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) { @@ -376,8 +376,7 @@ internal open class CancellableContinuationImpl( * [segment] and [index] in this [CancellableContinuationImpl]. * * The only difference is that `segment.onCancellation(..)` is never - * called if this continuation is already completed; thus, - * the semantics is similar to [BeforeResumeCancelHandler]. + * called if this continuation is already completed; * * ``` * invokeOnCancellation { cause -> @@ -436,9 +435,8 @@ internal open class CancellableContinuationImpl( * Continuation was already completed, and might already have cancel handler. */ if (state.cancelHandler != null) multipleHandlersError(handler, state) - // BeforeResumeCancelHandler and Segment.invokeOnCancellation(..) - // do NOT need to be called on completed continuation. - if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return + // Segment.invokeOnCancellation(..) does NOT need to be called on completed continuation. + if (handler is Segment<*>) return handler as CancelHandler if (state.cancelled) { // Was already cancelled while being dispatched -- invoke the handler directly @@ -451,10 +449,10 @@ internal open class CancellableContinuationImpl( else -> { /* * Continuation was already completed normally, but might get cancelled while being dispatched. - * Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which + * Change its state to CompletedContinuation, unless we have Segment which * does not need to be called in this case. */ - if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return + if (handler is Segment<*>) return handler as CancelHandler val update = CompletedContinuation(state, cancelHandler = handler) if (_state.compareAndSet(state, update)) return // quit on cas success @@ -489,7 +487,7 @@ internal open class CancellableContinuationImpl( proposedUpdate } !resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine - onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null -> + onCancellation != null || state is CancelHandler || idempotent != null -> // mark as CompletedContinuation if special cases are present: // Cancellation handlers that shall be called after resume or idempotent resume CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent) @@ -636,14 +634,6 @@ private object Active : NotCompleted { */ internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted -/** - * Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked - * if continuation is cancelled after resumption, during dispatch, because the corresponding resources - * were already released before calling `resume`. This cancel handler is called only before `resume`. - * It avoids allocation of [CompletedContinuation] instance during resume on JVM. - */ -internal abstract class BeforeResumeCancelHandler : CancelHandler() - // Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly private class InvokeOnCancel( // Clashes with InvokeOnCancellation private val handler: CompletionHandler diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index b749ee63f8..4fc7d4384d 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -172,34 +172,10 @@ internal open class BufferedChannel( segment: ChannelSegment, index: Int ) { - if (onUndeliveredElement == null) { - invokeOnCancellation(segment, index) - } else { - when (this) { - is CancellableContinuation<*> -> { - invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context).asHandler) - } - is SelectInstance<*> -> { - disposeOnCompletion(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context)) - } - is SendBroadcast -> { - cont.invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, cont.context).asHandler) - } - else -> error("unexpected sender: $this") - } - } - } - - private inner class SenderWithOnUndeliveredElementCancellationHandler( - private val segment: ChannelSegment, - private val index: Int, - private val context: CoroutineContext - ) : BeforeResumeCancelHandler(), DisposableHandle { - override fun dispose() { - segment.onSenderCancellationWithOnUndeliveredElement(index, context) - } - - override fun invoke(cause: Throwable?) = dispose() + // To distinguish cancelled senders and receivers, + // senders equip the index value with an additional marker, + // adding `SEGMENT_SIZE` to the value. + invokeOnCancellation(segment, index + SEGMENT_SIZE) } private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation) { @@ -1594,7 +1570,7 @@ internal open class BufferedChannel( * and [SelectInstance.trySelect]. When the channel becomes closed, * [tryResumeHasNextOnClosedChannel] should be used instead. */ - private inner class BufferedChannelIterator : ChannelIterator, BeforeResumeCancelHandler(), Waiter { + private inner class BufferedChannelIterator : ChannelIterator, Waiter { /** * Stores the element retrieved by [hasNext] or * a special [CHANNEL_CLOSED] token if this channel is closed. @@ -1607,20 +1583,7 @@ internal open class BufferedChannel( * continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel] * function resume this continuation when the [hasNext] invocation should complete. */ - private var continuation: CancellableContinuation? = null - - // When `hasNext()` suspends, the location where the continuation - // is stored is specified via the segment and the index in it. - // We need this information in the cancellation handler below. - private var segment: Segment<*>? = null - private var index = -1 - - /** - * Invoked on cancellation, [BeforeResumeCancelHandler] implementation. - */ - override fun invoke(cause: Throwable?) { - segment?.onCancellation(index, null) - } + private var continuation: CancellableContinuationImpl? = null // `hasNext()` is just a special receive operation. override suspend fun hasNext(): Boolean = @@ -1680,11 +1643,7 @@ internal open class BufferedChannel( } override fun invokeOnCancellation(segment: Segment<*>, index: Int) { - this.segment = segment - this.index = index - // It is possible that this `hasNext()` invocation is already - // resumed, and the `continuation` field is already updated to `null`. - this.continuation?.invokeOnCancellation(this.asHandler) + this.continuation?.invokeOnCancellation(segment, index) } private fun onClosedHasNextNoWaiterSuspend() { @@ -2826,67 +2785,51 @@ internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: Bu // # Cancellation Support # // ######################## - override fun onCancellation(index: Int, cause: Throwable?) { - onCancellation(index) - } - - fun onSenderCancellationWithOnUndeliveredElement(index: Int, context: CoroutineContext) { - // Read the element first. If the operation has not been successfully resumed - // (this cancellation may be caused by prompt cancellation during dispatching), - // it is guaranteed that the element is presented. + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { + // To distinguish cancelled senders and receivers, senders equip the index value with + // an additional marker, adding `SEGMENT_SIZE` to the value. + val isSender = index >= SEGMENT_SIZE + // Unwrap the index. + @Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index + // Read the element, which may be needed further to call `onUndeliveredElement`. val element = getElement(index) - // Perform the cancellation; `onCancellationImpl(..)` return `true` if the - // cancelled operation had not been resumed. In this case, the `onUndeliveredElement` - // lambda should be called. - if (onCancellation(index)) { - channel.onUndeliveredElement!!.callUndeliveredElement(element, context) - } - } - - /** - * Returns `true` if the request is successfully cancelled, - * and no rendezvous has happened. We need this knowledge - * to keep [BufferedChannel.onUndeliveredElement] correct. - */ - @Suppress("ConvertTwoComparisonsToRangeCheck") - fun onCancellation(index: Int): Boolean { - // Count the global index of this cell and read - // the current counters of send and receive operations. - val globalIndex = id * SEGMENT_SIZE + index - val s = channel.sendersCounter - val r = channel.receiversCounter - // Update the cell state trying to distinguish whether - // the cancelled coroutine is sender or receiver. - var isSender: Boolean - var isReceiver: Boolean - while (true) { // CAS-loop + // Update the cell state. + while (true) { + // CAS-loop // Read the current state of the cell. - val cur = data[index * 2 + 1].value + val cur = getState(index) when { // The cell stores a waiter. cur is Waiter || cur is WaiterEB -> { - // Is the cancelled request send for sure? - isSender = globalIndex < s && globalIndex >= r - // Is the cancelled request receiver for sure? - isReceiver = globalIndex < r && globalIndex >= s - // If the cancelled coroutine neither sender - // nor receiver, clean the element slot and finish. - // An opposite operation will resume this request - // and update the cell state eventually. - if (!isSender && !isReceiver) { - cleanElement(index) - return true - } // The cancelled request is either send or receive. // Update the cell state correspondingly. val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV - if (data[index * 2 + 1].compareAndSet(cur, update)) break + if (casState(index, cur, update)) { + // The waiter has been successfully cancelled. + // Clean the element slot and invoke `onSlotCleaned()`, + // which may cause deleting the whole segment from the linked list. + // In case the cancelled request is receiver, it is critical to ensure + // that the `expandBuffer()` attempt that processes this cell is completed, + // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`. + cleanElement(index) + onCancelledRequest(index, !isSender) + // Call `onUndeliveredElement` if needed. + if (isSender) { + channel.onUndeliveredElement?.callUndeliveredElement(element, context) + } + return + } } // The cell already indicates that the operation is cancelled. cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> { - // Clean the element slot to avoid memory leaks and finish. + // Clean the element slot to avoid memory leaks, + // invoke `onUndeliveredElement` if needed, and finish cleanElement(index) - return true + // Call `onUndeliveredElement` if needed. + if (isSender) { + channel.onUndeliveredElement?.callUndeliveredElement(element, context) + } + return } // An opposite operation is resuming this request; // wait until the cell state updates. @@ -2897,23 +2840,13 @@ internal class ChannelSegment(id: Long, prev: ChannelSegment?, channel: Bu cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue // This request was successfully resumed, so this cancellation // is caused by the prompt cancellation feature and should be ignored. - cur === DONE_RCV || cur === BUFFERED -> return false + cur === DONE_RCV || cur === BUFFERED -> return // The cell state indicates that the channel is closed; // this cancellation should be ignored. - cur === CHANNEL_CLOSED -> { - return false - } + cur === CHANNEL_CLOSED -> return else -> error("unexpected state: $cur") } } - // Clean the element slot and invoke `onSlotCleaned()`, - // which may cause deleting the whole segment from the linked list. - // In case the cancelled request is receiver, it is critical to ensure - // that the `expandBuffer()` attempt that processes this cell is completed, - // so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`. - cleanElement(index) - onCancelledRequest(index, isReceiver) - return true } /** diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt index f848e37881..ecfafcf439 100644 --- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt +++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlinx.coroutines.* +import kotlin.coroutines.* import kotlin.jvm.* /** @@ -230,8 +231,14 @@ internal abstract class Segment>( * This function is invoked on continuation cancellation when this segment * with the specified [index] are installed as cancellation handler via * `SegmentDisposable.disposeOnCancellation(Segment, Int)`. + * + * @param index the index under which the sement registered itself in the continuation. + * Indicies are opaque and arithmetics or numeric intepretation is not allowed on them, + * as they may encode additional metadata. + * @param cause the cause of the cancellation, with the same semantics as [CancellableContinuation.invokeOnCancellation] + * @param context the context of the cancellable continuation the segment was registered in */ - abstract fun onCancellation(index: Int, cause: Throwable?) + abstract fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) /** * Invoked on each slot clean-up; should not be invoked twice for the same slot. diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 51ea522a97..3ac3cb6f27 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -241,7 +241,7 @@ public sealed interface SelectInstance { internal interface SelectInstanceInternal: SelectInstance, Waiter @PublishedApi -internal open class SelectImplementation constructor( +internal open class SelectImplementation( override val context: CoroutineContext ) : CancelHandler(), SelectBuilder, SelectInstanceInternal { @@ -363,7 +363,7 @@ internal open class SelectImplementation constructor( * thus, other parties are bound to fail when making a rendezvous with it. */ private val isSelected - get() = state.value is ClauseData<*> + get() = state.value is SelectImplementation<*>.ClauseData /** * Returns `true` if this `select` is cancelled. */ @@ -373,7 +373,7 @@ internal open class SelectImplementation constructor( /** * List of clauses waiting on this `select` instance. */ - private var clauses: MutableList>? = ArrayList(2) + private var clauses: MutableList? = ArrayList(2) /** * Stores the completion action provided through [disposeOnCompletion] or [invokeOnCancellation] @@ -439,11 +439,11 @@ internal open class SelectImplementation constructor( // ======================== override fun SelectClause0.invoke(block: suspend () -> R) = - ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor).register() + ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor).register() override fun SelectClause1.invoke(block: suspend (Q) -> R) = - ClauseData(clauseObject, regFunc, processResFunc, null, block, onCancellationConstructor).register() + ClauseData(clauseObject, regFunc, processResFunc, null, block, onCancellationConstructor).register() override fun SelectClause2.invoke(param: P, block: suspend (Q) -> R) = - ClauseData(clauseObject, regFunc, processResFunc, param, block, onCancellationConstructor).register() + ClauseData(clauseObject, regFunc, processResFunc, param, block, onCancellationConstructor).register() /** * Attempts to register this `select` clause. If another clause is already selected, @@ -461,10 +461,10 @@ internal open class SelectImplementation constructor( * updates the state to this clause reference. */ @JvmName("register") - internal fun ClauseData.register(reregister: Boolean = false) { + internal fun ClauseData.register(reregister: Boolean = false) { assert { state.value !== STATE_CANCELLED } // Is there already selected clause? - if (state.value.let { it is ClauseData<*> }) return + if (state.value.let { it is SelectImplementation<*>.ClauseData }) return // For new clauses, check that there does not exist // another clause with the same object. if (!reregister) checkClauseObject(clauseObject) @@ -569,7 +569,7 @@ internal open class SelectImplementation constructor( curState.forEach { reregisterClause(it) } } // This `select` operation became completed during clauses re-registration. - curState is ClauseData<*> -> { + curState is SelectImplementation<*>.ClauseData -> { cont.resume(Unit, curState.createOnCancellationAction(this, internalResult)) return@sc } @@ -628,7 +628,7 @@ internal open class SelectImplementation constructor( } } // Already selected. - STATE_COMPLETED, is ClauseData<*> -> return TRY_SELECT_ALREADY_SELECTED + STATE_COMPLETED, is SelectImplementation<*>.ClauseData -> return TRY_SELECT_ALREADY_SELECTED // Already cancelled. STATE_CANCELLED -> return TRY_SELECT_CANCELLED // This select is still in REGISTRATION phase, re-register the clause @@ -650,7 +650,7 @@ internal open class SelectImplementation constructor( * If the reference to the list of clauses is already cleared due to completion/cancellation, * this function returns `null` */ - private fun findClause(clauseObject: Any): ClauseData? { + private fun findClause(clauseObject: Any): ClauseData? { // Read the list of clauses. If the `clauses` field is already `null`, // the clean-up phase has already completed, and this function returns `null`. val clauses = this.clauses ?: return null @@ -678,7 +678,7 @@ internal open class SelectImplementation constructor( assert { isSelected } // Get the selected clause. @Suppress("UNCHECKED_CAST") - val selectedClause = state.value as ClauseData + val selectedClause = state.value as SelectImplementation.ClauseData // Perform the clean-up before the internal result processing and // the user-specified block invocation to guarantee the absence // of memory leaks. Collect the internal result before that. @@ -700,7 +700,7 @@ internal open class SelectImplementation constructor( } } - private suspend fun processResultAndInvokeBlockRecoveringException(clause: ClauseData, internalResult: Any?): R = + private suspend fun processResultAndInvokeBlockRecoveringException(clause: ClauseData, internalResult: Any?): R = try { val blockArgument = clause.processResult(internalResult) clause.invokeBlock(blockArgument) @@ -716,7 +716,7 @@ internal open class SelectImplementation constructor( * [SelectInstance.disposeOnCompletion] during * clause registrations. */ - private fun cleanup(selectedClause: ClauseData) { + private fun cleanup(selectedClause: ClauseData) { assert { state.value == selectedClause } // Read the list of clauses. If the `clauses` field is already `null`, // a concurrent clean-up procedure has already completed, and it is safe to finish. @@ -757,7 +757,7 @@ internal open class SelectImplementation constructor( /** * Each `select` clause is internally represented with a [ClauseData] instance. */ - internal class ClauseData( + internal inner class ClauseData( @JvmField val clauseObject: Any, // the object of this `select` clause: Channel, Mutex, Job, ... private val regFunc: RegistrationFunction, private val processResFunc: ProcessResultFunction, @@ -825,7 +825,7 @@ internal open class SelectImplementation constructor( fun dispose() { with(disposableHandleOrSegment) { if (this is Segment<*>) { - this.onCancellation(indexInSegment, null) + this.onCancellation(indexInSegment, null, context) } else { (this as? DisposableHandle)?.dispose() } diff --git a/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt b/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt index 5329a15a0f..a4b1e04341 100644 --- a/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt +++ b/kotlinx-coroutines-core/common/src/selects/SelectUnbiased.kt @@ -38,7 +38,7 @@ public suspend inline fun selectUnbiased(crossinline builder: SelectBuilder< */ @PublishedApi internal open class UnbiasedSelectImplementation(context: CoroutineContext) : SelectImplementation(context) { - private val clausesToRegister: MutableList> = arrayListOf() + private val clausesToRegister: MutableList = arrayListOf() override fun SelectClause0.invoke(block: suspend () -> R) { clausesToRegister += ClauseData(clauseObject, regFunc, processResFunc, PARAM_CLAUSE_0, block, onCancellationConstructor) diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 8ef888d801..9f30721df5 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* import kotlin.contracts.* +import kotlin.coroutines.* import kotlin.js.* import kotlin.math.* @@ -378,7 +379,7 @@ private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) // Cleans the acquirer slot located by the specified index // and removes this segment physically if all slots are cleaned. - override fun onCancellation(index: Int, cause: Throwable?) { + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { // Clean the slot set(index, CANCELLED) // Remove this segment if needed diff --git a/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt b/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt index bd6a44fff8..54bc18c17b 100644 --- a/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt +++ b/kotlinx-coroutines-core/common/test/CancellableContinuationHandlersTest.kt @@ -167,7 +167,7 @@ class CancellableContinuationHandlersTest : TestBase() { override val numberOfSlots: Int get() = 0 var invokeOnCancellationCalled = false - override fun onCancellation(index: Int, cause: Throwable?) { + override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) { invokeOnCancellationCalled = true } }