Skip to content

Commit

Permalink
Deprecate SendChannel.offer and ReceiveChannel.poll, replace their us…
Browse files Browse the repository at this point in the history
…ages along the codebase (Kotlin#2644)

* Deprecate SendChannel.offer and replace its usages along the codebase
* Deprecate ReceiveChannel.poll and replace its usages along the codebase

Co-authored-by: Roman Elizarov <elizarov@gmail.com>

Addresses Kotlin#974
  • Loading branch information
qwwdfsad authored Apr 13, 2021
1 parent 7b1f3b3 commit 3c83c0c
Show file tree
Hide file tree
Showing 52 changed files with 289 additions and 310 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ internal abstract class AbstractSendChannel<E>(
}

override fun offer(element: E): Boolean {
// Temporary migration for offer users who rely on onUndeliveredElement
try {
return super.offer(element)
} catch (e: Throwable) {
Expand Down
191 changes: 106 additions & 85 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import kotlin.jvm.*
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by an invocation of [close]. This means that
* calling [send] or [offer] will result in an exception.
* calling [send] will result in an exception.
*
* **Note: This is an experimental api.** This property may change its semantics and/or name in the future.
*/
Expand All @@ -51,7 +51,7 @@ public interface SendChannel<in E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onSend] clause.
* Use [offer] to try sending to this channel without waiting.
* Use [trySend] to try sending to this channel without waiting.
*/
public suspend fun send(element: E)

Expand All @@ -64,23 +64,6 @@ public interface SendChannel<in E> {
*/
public val onSend: SelectClause2<E, SendChannel<E>>

/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
* and returns `true`. Otherwise, just returns `false`. This is a synchronous variant of [send] which backs off
* in situations when `send` suspends.
*
* Throws an exception if the channel [is closed for `send`][isClosedForSend] (see [close] for details).
*
* When `offer` call returns `false` it guarantees that the element was not delivered to the consumer and it
* it does not call `onUndeliveredElement` that was installed for this channel. If the channel was closed,
* then it calls `onUndeliveredElement` before throwing an exception.
* See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
*/
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
}

/**
* Immediately adds the specified [element] to this channel, if this doesn't violate its capacity restrictions,
Expand All @@ -103,7 +86,7 @@ public interface SendChannel<in E> {
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
* A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send] or [offer]
* A channel that was closed without a [cause] throws a [ClosedSendChannelException] on attempts to [send]
* and [ClosedReceiveChannelException] on attempts to [receive][ReceiveChannel.receive].
* A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or
* receive on a failed channel throw the specified [cause] exception.
Expand All @@ -122,10 +105,11 @@ public interface SendChannel<in E> {
* * the cause of `close` or `cancel` otherwise.
*
* Example of usage (exception handling is omitted):
*
* ```
* val events = Channel(UNLIMITED)
* callbackBasedApi.registerCallback { event ->
* events.offer(event)
* events.trySend(event)
* }
*
* val uiUpdater = launch(Dispatchers.Main, parent = UILifecycle) {
Expand All @@ -134,7 +118,6 @@ public interface SendChannel<in E> {
* }
*
* events.invokeOnClose { callbackBasedApi.stop() }
*
* ```
*
* **Note: This is an experimental api.** This function may change its semantics, parameters or return type in the future.
Expand All @@ -146,6 +129,33 @@ public interface SendChannel<in E> {
*/
@ExperimentalCoroutinesApi
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

/**
* **Deprecated** offer method.
*
* This method was deprecated in the favour of [trySend].
* It has proven itself as the most error-prone method in Channel API:
*
* * `Boolean` return type creates the false sense of security, implying that `false`
* is returned instead of throwing an exception.
* * It was used mostly from non-suspending APIs where CancellationException triggered
* internal failures in the application (the most common source of bugs).
* * Due to signature and explicit `if (ch.offer(...))` checks it was easy to
* oversee such error during code review.
* * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
*
* See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
*/
@Deprecated(
level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'trySend' method",
replaceWith = ReplaceWith("trySend(element).isSuccess")
) // Since 1.5.0
public fun offer(element: E): Boolean {
val result = trySend(element)
if (result.isSuccess) return true
throw recoverStackTrace(result.exceptionOrNull() ?: return false)
}
}

/**
Expand Down Expand Up @@ -188,7 +198,7 @@ public interface ReceiveChannel<out E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceive] clause.
* Use [poll] to try receiving from this channel without waiting.
* Use [tryReceive] to try receiving from this channel without waiting.
*/
public suspend fun receive(): E

Expand All @@ -200,51 +210,6 @@ public interface ReceiveChannel<out E> {
*/
public val onReceive: SelectClause1<E>

/**
* This function was deprecated since 1.3.0 and is no longer recommended to use
* or to implement in subclasses.
*
* It had the following pitfalls:
* - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
* - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
* - It didn't really belong to core channel API and can be exposed as an extension instead.
*
* @suppress doc
*/
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(
message = "Deprecated in favor of receiveCatching",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("receiveCatching().getOrNull()")
) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0
public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()

/**
* This function was deprecated since 1.3.0 and is no longer recommended to use
* or to implement in subclasses.
* See [receiveOrNull] documentation.
*
* @suppress **Deprecated**: in favor of onReceiveCatching extension.
*/
@Deprecated(
message = "Deprecated in favor of onReceiveCatching extension",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("onReceiveCatching")
) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0
public val onReceiveOrNull: SelectClause1<E?>
get() {
return object : SelectClause1<E?> {
@InternalCoroutinesApi
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
onReceiveCatching.registerSelectClause1(select) {
it.exceptionOrNull()?.let { throw it }
block(it.getOrNull())
}
}
}
}

/**
* Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
* This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
Expand All @@ -262,7 +227,7 @@ public interface ReceiveChannel<out E> {
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceiveCatching] clause.
* Use [poll] to try receiving from this channel without waiting.
* Use [tryReceive] to try receiving from this channel without waiting.
*/
public suspend fun receiveCatching(): ChannelResult<E>

Expand All @@ -273,17 +238,6 @@ public interface ReceiveChannel<out E> {
*/
public val onReceiveCatching: SelectClause1<ChannelResult<E>>

/**
* Retrieves and removes an element from this channel if it's not empty or returns `null` if the channel is empty
* or is [is closed for `receive`][isClosedForReceive] without a cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun poll(): E? {
val result = tryReceive()
if (result.isSuccess) return result.getOrThrow()
throw recoverStackTrace(result.exceptionOrNull() ?: return null)
}

/**
* Retrieves and removes an element from this channel if it's not empty, returning a [successful][ChannelResult.success]
* result, returns [failed][ChannelResult.failed] result if the channel is empty, and [closed][ChannelResult.closed]
Expand Down Expand Up @@ -325,6 +279,75 @@ public interface ReceiveChannel<out E> {
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
public fun cancel(cause: Throwable? = null): Boolean

/**
* **Deprecated** poll method.
*
* This method was deprecated in the favour of [tryReceive].
* It has proven itself as error-prone method in Channel API:
*
* * Nullable return type creates the false sense of security, implying that `null`
* is returned instead of throwing an exception.
* * It was used mostly from non-suspending APIs where CancellationException triggered
* internal failures in the application (the most common source of bugs).
* * Its name was not aligned with the rest of the API and tried to mimic Java's queue instead.
*
* See https://github.com/Kotlin/kotlinx.coroutines/issues/974 for more context.
*/
@Deprecated(level = DeprecationLevel.WARNING,
message = "Deprecated in the favour of 'tryReceive'",
replaceWith = ReplaceWith("tryReceive().getOrNull()")
) // Since 1.5.0
public fun poll(): E? {
val result = tryReceive()
if (result.isSuccess) return result.getOrThrow()
throw recoverStackTrace(result.exceptionOrNull() ?: return null)
}

/**
* This function was deprecated since 1.3.0 and is no longer recommended to use
* or to implement in subclasses.
*
* It had the following pitfalls:
* - Didn't allow to distinguish 'null' as "closed channel" from "null as a value"
* - Was throwing if the channel has failed even though its signature may suggest it returns 'null'
* - It didn't really belong to core channel API and can be exposed as an extension instead.
*
* @suppress doc
*/
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(
message = "Deprecated in favor of receiveCatching",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("receiveCatching().getOrNull()")
) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0
public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull()

/**
* This function was deprecated since 1.3.0 and is no longer recommended to use
* or to implement in subclasses.
* See [receiveOrNull] documentation.
*
* @suppress **Deprecated**: in favor of onReceiveCatching extension.
*/
@Deprecated(
message = "Deprecated in favor of onReceiveCatching extension",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("onReceiveCatching")
) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0
public val onReceiveOrNull: SelectClause1<E?>
get() {
return object : SelectClause1<E?> {
@InternalCoroutinesApi
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
onReceiveCatching.registerSelectClause1(select) {
it.exceptionOrNull()?.let { throw it }
block(it.getOrNull())
}
}
}
}
}

/**
Expand Down Expand Up @@ -544,14 +567,14 @@ public interface ChannelIterator<out E> {
*
* * When `capacity` is [Channel.UNLIMITED] &mdash; it creates a channel with effectively unlimited buffer.
* This channel has a linked-list buffer of unlimited capacity (limited only by available memory).
* [Sending][send] to this channel never suspends, and [offer] always returns `true`.
* [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is [Channel.CONFLATED] &mdash; it creates a _conflated_ channel
* This channel buffers at most one element and conflates all subsequent `send` and `offer` invocations,
* This channel buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the last element sent.
* Back-to-back sent elements are conflated &mdash; only the last sent element is received,
* while previously sent elements **are lost**.
* [Sending][send] to this channel never suspends, and [offer] always returns `true`.
* [Sending][send] to this channel never suspends, and [trySend] always succeeds.
*
* * When `capacity` is positive but less than [UNLIMITED] &mdash; it creates an array-based channel with the specified capacity.
* This channel has an array buffer of a fixed `capacity`.
Expand Down Expand Up @@ -598,8 +621,6 @@ public interface ChannelIterator<out E> {
*
* * When [send][SendChannel.send] operation throws an exception because it was cancelled before it had a chance to actually
* send the element or because the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
* * When [offer][SendChannel.offer] operation throws an exception when
* the channel was [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel].
* * When [receive][ReceiveChannel.receive], [receiveOrNull][ReceiveChannel.receiveOrNull], or [hasNext][ChannelIterator.hasNext]
* operation throws an exception when it had retrieved the element from the
* channel but was cancelled before the code following the receive call resumed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import kotlin.jvm.*
* Back-to-send sent elements are _conflated_ -- only the the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
* Sender to this broadcast channel never suspends and [offer] always returns `true`.
* Sender to this broadcast channel never suspends and [trySend] always succeeds.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*

/**
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
* Channel that buffers at most one element and conflates all subsequent `send` and `trySend` invocations,
* so that the receiver always gets the most recently sent element.
* Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
* while previously sent elements **are lost**.
* Sender to this channel never suspends and [offer] always returns `true`.
* Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import kotlinx.coroutines.selects.*

/**
* Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
* Sender to this channel never suspends and [offer] always returns `true`.
* Sender to this channel never suspends and [trySend] always succeeds.
*
* This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
*
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ import kotlin.native.concurrent.*
*
* To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
* constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
* values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls
* values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls
* with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
*
* ### Concurrency
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ import kotlin.native.concurrent.*
*
* To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()`
* constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one.
* Replace [send][ConflatedBroadcastChannel.send] and [offer][ConflatedBroadcastChannel.offer] calls
* Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls
* with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators.
* You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ internal suspend fun <R, T> FlowCollector<R>.combineInternal(
// Received the second value from the same flow in the same epoch -- bail out
if (lastReceivedEpoch[index] == currentEpoch) break
lastReceivedEpoch[index] = currentEpoch
element = resultChannel.poll() ?: break
element = resultChannel.tryReceive().getOrNull() ?: break
}

// Process batch result if there is enough data
Expand Down
Loading

0 comments on commit 3c83c0c

Please sign in to comment.