Skip to content

Commit

Permalink
Get rid of BeforeResumeCancelHandler (#3744)
Browse files Browse the repository at this point in the history
* Remove effectively duplicating code
* BufferedChannelIterator is properly registered for cancellation as a corresponding segment 
* Undelivered element handlers are registered via extended segment's protocol
    - Index now encodes information whether segment corresponds to a sender or to a receiver

Fixes #3646
  • Loading branch information
qwwdfsad authored May 11, 2023
1 parent 8bc4f35 commit 435844a
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 146 deletions.
24 changes: 7 additions & 17 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ internal open class CancellableContinuationImpl<in T>(
private fun callSegmentOnCancellation(segment: Segment<*>, cause: Throwable?) {
val index = _decisionAndIndex.value.index
check(index != NO_INDEX) { "The index for Segment.onCancellation(..) is broken" }
callCancelHandlerSafely { segment.onCancellation(index, cause) }
callCancelHandlerSafely { segment.onCancellation(index, cause, context) }
}

fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
Expand Down Expand Up @@ -376,8 +376,7 @@ internal open class CancellableContinuationImpl<in T>(
* [segment] and [index] in this [CancellableContinuationImpl].
*
* The only difference is that `segment.onCancellation(..)` is never
* called if this continuation is already completed; thus,
* the semantics is similar to [BeforeResumeCancelHandler].
* called if this continuation is already completed;
*
* ```
* invokeOnCancellation { cause ->
Expand Down Expand Up @@ -436,9 +435,8 @@ internal open class CancellableContinuationImpl<in T>(
* Continuation was already completed, and might already have cancel handler.
*/
if (state.cancelHandler != null) multipleHandlersError(handler, state)
// BeforeResumeCancelHandler and Segment.invokeOnCancellation(..)
// do NOT need to be called on completed continuation.
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
// Segment.invokeOnCancellation(..) does NOT need to be called on completed continuation.
if (handler is Segment<*>) return
handler as CancelHandler
if (state.cancelled) {
// Was already cancelled while being dispatched -- invoke the handler directly
Expand All @@ -451,10 +449,10 @@ internal open class CancellableContinuationImpl<in T>(
else -> {
/*
* Continuation was already completed normally, but might get cancelled while being dispatched.
* Change its state to CompletedContinuation, unless we have BeforeResumeCancelHandler which
* Change its state to CompletedContinuation, unless we have Segment which
* does not need to be called in this case.
*/
if (handler is BeforeResumeCancelHandler || handler is Segment<*>) return
if (handler is Segment<*>) return
handler as CancelHandler
val update = CompletedContinuation(state, cancelHandler = handler)
if (_state.compareAndSet(state, update)) return // quit on cas success
Expand Down Expand Up @@ -489,7 +487,7 @@ internal open class CancellableContinuationImpl<in T>(
proposedUpdate
}
!resumeMode.isCancellableMode && idempotent == null -> proposedUpdate // cannot be cancelled in process, all is fine
onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
onCancellation != null || state is CancelHandler || idempotent != null ->
// mark as CompletedContinuation if special cases are present:
// Cancellation handlers that shall be called after resume or idempotent resume
CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
Expand Down Expand Up @@ -636,14 +634,6 @@ private object Active : NotCompleted {
*/
internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted

/**
* Base class for all [CancellableContinuation.invokeOnCancellation] handlers that don't need to be invoked
* if continuation is cancelled after resumption, during dispatch, because the corresponding resources
* were already released before calling `resume`. This cancel handler is called only before `resume`.
* It avoids allocation of [CompletedContinuation] instance during resume on JVM.
*/
internal abstract class BeforeResumeCancelHandler : CancelHandler()

// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly
private class InvokeOnCancel( // Clashes with InvokeOnCancellation
private val handler: CompletionHandler
Expand Down
151 changes: 42 additions & 109 deletions kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -172,34 +172,10 @@ internal open class BufferedChannel<E>(
segment: ChannelSegment<E>,
index: Int
) {
if (onUndeliveredElement == null) {
invokeOnCancellation(segment, index)
} else {
when (this) {
is CancellableContinuation<*> -> {
invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context).asHandler)
}
is SelectInstance<*> -> {
disposeOnCompletion(SenderWithOnUndeliveredElementCancellationHandler(segment, index, context))
}
is SendBroadcast -> {
cont.invokeOnCancellation(SenderWithOnUndeliveredElementCancellationHandler(segment, index, cont.context).asHandler)
}
else -> error("unexpected sender: $this")
}
}
}

private inner class SenderWithOnUndeliveredElementCancellationHandler(
private val segment: ChannelSegment<E>,
private val index: Int,
private val context: CoroutineContext
) : BeforeResumeCancelHandler(), DisposableHandle {
override fun dispose() {
segment.onSenderCancellationWithOnUndeliveredElement(index, context)
}

override fun invoke(cause: Throwable?) = dispose()
// To distinguish cancelled senders and receivers,
// senders equip the index value with an additional marker,
// adding `SEGMENT_SIZE` to the value.
invokeOnCancellation(segment, index + SEGMENT_SIZE)
}

private fun onClosedSendOnNoWaiterSuspend(element: E, cont: CancellableContinuation<Unit>) {
Expand Down Expand Up @@ -1594,7 +1570,7 @@ internal open class BufferedChannel<E>(
* and [SelectInstance.trySelect]. When the channel becomes closed,
* [tryResumeHasNextOnClosedChannel] should be used instead.
*/
private inner class BufferedChannelIterator : ChannelIterator<E>, BeforeResumeCancelHandler(), Waiter {
private inner class BufferedChannelIterator : ChannelIterator<E>, Waiter {
/**
* Stores the element retrieved by [hasNext] or
* a special [CHANNEL_CLOSED] token if this channel is closed.
Expand All @@ -1607,20 +1583,7 @@ internal open class BufferedChannel<E>(
* continuation. The [tryResumeHasNext] and [tryResumeHasNextOnClosedChannel]
* function resume this continuation when the [hasNext] invocation should complete.
*/
private var continuation: CancellableContinuation<Boolean>? = null

// When `hasNext()` suspends, the location where the continuation
// is stored is specified via the segment and the index in it.
// We need this information in the cancellation handler below.
private var segment: Segment<*>? = null
private var index = -1

/**
* Invoked on cancellation, [BeforeResumeCancelHandler] implementation.
*/
override fun invoke(cause: Throwable?) {
segment?.onCancellation(index, null)
}
private var continuation: CancellableContinuationImpl<Boolean>? = null

// `hasNext()` is just a special receive operation.
override suspend fun hasNext(): Boolean =
Expand Down Expand Up @@ -1680,11 +1643,7 @@ internal open class BufferedChannel<E>(
}

override fun invokeOnCancellation(segment: Segment<*>, index: Int) {
this.segment = segment
this.index = index
// It is possible that this `hasNext()` invocation is already
// resumed, and the `continuation` field is already updated to `null`.
this.continuation?.invokeOnCancellation(this.asHandler)
this.continuation?.invokeOnCancellation(segment, index)
}

private fun onClosedHasNextNoWaiterSuspend() {
Expand Down Expand Up @@ -2826,67 +2785,51 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
// # Cancellation Support #
// ########################

override fun onCancellation(index: Int, cause: Throwable?) {
onCancellation(index)
}

fun onSenderCancellationWithOnUndeliveredElement(index: Int, context: CoroutineContext) {
// Read the element first. If the operation has not been successfully resumed
// (this cancellation may be caused by prompt cancellation during dispatching),
// it is guaranteed that the element is presented.
override fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext) {
// To distinguish cancelled senders and receivers, senders equip the index value with
// an additional marker, adding `SEGMENT_SIZE` to the value.
val isSender = index >= SEGMENT_SIZE
// Unwrap the index.
@Suppress("NAME_SHADOWING") val index = if (isSender) index - SEGMENT_SIZE else index
// Read the element, which may be needed further to call `onUndeliveredElement`.
val element = getElement(index)
// Perform the cancellation; `onCancellationImpl(..)` return `true` if the
// cancelled operation had not been resumed. In this case, the `onUndeliveredElement`
// lambda should be called.
if (onCancellation(index)) {
channel.onUndeliveredElement!!.callUndeliveredElement(element, context)
}
}

/**
* Returns `true` if the request is successfully cancelled,
* and no rendezvous has happened. We need this knowledge
* to keep [BufferedChannel.onUndeliveredElement] correct.
*/
@Suppress("ConvertTwoComparisonsToRangeCheck")
fun onCancellation(index: Int): Boolean {
// Count the global index of this cell and read
// the current counters of send and receive operations.
val globalIndex = id * SEGMENT_SIZE + index
val s = channel.sendersCounter
val r = channel.receiversCounter
// Update the cell state trying to distinguish whether
// the cancelled coroutine is sender or receiver.
var isSender: Boolean
var isReceiver: Boolean
while (true) { // CAS-loop
// Update the cell state.
while (true) {
// CAS-loop
// Read the current state of the cell.
val cur = data[index * 2 + 1].value
val cur = getState(index)
when {
// The cell stores a waiter.
cur is Waiter || cur is WaiterEB -> {
// Is the cancelled request send for sure?
isSender = globalIndex < s && globalIndex >= r
// Is the cancelled request receiver for sure?
isReceiver = globalIndex < r && globalIndex >= s
// If the cancelled coroutine neither sender
// nor receiver, clean the element slot and finish.
// An opposite operation will resume this request
// and update the cell state eventually.
if (!isSender && !isReceiver) {
cleanElement(index)
return true
}
// The cancelled request is either send or receive.
// Update the cell state correspondingly.
val update = if (isSender) INTERRUPTED_SEND else INTERRUPTED_RCV
if (data[index * 2 + 1].compareAndSet(cur, update)) break
if (casState(index, cur, update)) {
// The waiter has been successfully cancelled.
// Clean the element slot and invoke `onSlotCleaned()`,
// which may cause deleting the whole segment from the linked list.
// In case the cancelled request is receiver, it is critical to ensure
// that the `expandBuffer()` attempt that processes this cell is completed,
// so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
cleanElement(index)
onCancelledRequest(index, !isSender)
// Call `onUndeliveredElement` if needed.
if (isSender) {
channel.onUndeliveredElement?.callUndeliveredElement(element, context)
}
return
}
}
// The cell already indicates that the operation is cancelled.
cur === INTERRUPTED_SEND || cur === INTERRUPTED_RCV -> {
// Clean the element slot to avoid memory leaks and finish.
// Clean the element slot to avoid memory leaks,
// invoke `onUndeliveredElement` if needed, and finish
cleanElement(index)
return true
// Call `onUndeliveredElement` if needed.
if (isSender) {
channel.onUndeliveredElement?.callUndeliveredElement(element, context)
}
return
}
// An opposite operation is resuming this request;
// wait until the cell state updates.
Expand All @@ -2897,23 +2840,13 @@ internal class ChannelSegment<E>(id: Long, prev: ChannelSegment<E>?, channel: Bu
cur === RESUMING_BY_EB || cur === RESUMING_BY_RCV -> continue
// This request was successfully resumed, so this cancellation
// is caused by the prompt cancellation feature and should be ignored.
cur === DONE_RCV || cur === BUFFERED -> return false
cur === DONE_RCV || cur === BUFFERED -> return
// The cell state indicates that the channel is closed;
// this cancellation should be ignored.
cur === CHANNEL_CLOSED -> {
return false
}
cur === CHANNEL_CLOSED -> return
else -> error("unexpected state: $cur")
}
}
// Clean the element slot and invoke `onSlotCleaned()`,
// which may cause deleting the whole segment from the linked list.
// In case the cancelled request is receiver, it is critical to ensure
// that the `expandBuffer()` attempt that processes this cell is completed,
// so `onCancelledRequest(..)` waits for its completion before invoking `onSlotCleaned()`.
cleanElement(index)
onCancelledRequest(index, isReceiver)
return true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines.internal

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
Expand Down Expand Up @@ -230,8 +231,14 @@ internal abstract class Segment<S : Segment<S>>(
* This function is invoked on continuation cancellation when this segment
* with the specified [index] are installed as cancellation handler via
* `SegmentDisposable.disposeOnCancellation(Segment, Int)`.
*
* @param index the index under which the sement registered itself in the continuation.
* Indicies are opaque and arithmetics or numeric intepretation is not allowed on them,
* as they may encode additional metadata.
* @param cause the cause of the cancellation, with the same semantics as [CancellableContinuation.invokeOnCancellation]
* @param context the context of the cancellable continuation the segment was registered in
*/
abstract fun onCancellation(index: Int, cause: Throwable?)
abstract fun onCancellation(index: Int, cause: Throwable?, context: CoroutineContext)

/**
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
Expand Down
Loading

0 comments on commit 435844a

Please sign in to comment.