diff --git a/docs/topics/select-expression.md b/docs/topics/select-expression.md index 082a50d65b..3d20ff39d4 100644 --- a/docs/topics/select-expression.md +++ b/docs/topics/select-expression.md @@ -120,31 +120,32 @@ buzz -> 'Buzz!' ## Selecting on close The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding -`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a +`select` to throw an exception. We can use [onReceiveCatching][ReceiveChannel.onReceiveCatching] clause to perform a specific action when the channel is closed. The following example also shows that `select` is an expression that returns the result of its selected clause: ```kotlin suspend fun selectAorB(a: ReceiveChannel, b: ReceiveChannel): String = select { - a.onReceiveOrNull { value -> - if (value == null) - "Channel 'a' is closed" - else + a.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "a -> '$value'" + } else { + "Channel 'a' is closed" + } } - b.onReceiveOrNull { value -> - if (value == null) - "Channel 'b' is closed" - else + b.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "b -> '$value'" + } else { + "Channel 'b' is closed" + } } } ``` -Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only -for channels with non-nullable elements so that there is no accidental confusion between a closed channel -and a null value. Let's use it with channel `a` that produces "Hello" string four times and channel `b` that produces "World" four times: @@ -158,17 +159,21 @@ import kotlinx.coroutines.selects.* suspend fun selectAorB(a: ReceiveChannel, b: ReceiveChannel): String = select { - a.onReceiveOrNull { value -> - if (value == null) - "Channel 'a' is closed" - else + a.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "a -> '$value'" + } else { + "Channel 'a' is closed" + } } - b.onReceiveOrNull { value -> - if (value == null) - "Channel 'b' is closed" - else + b.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "b -> '$value'" + } else { + "Channel 'b' is closed" + } } } @@ -215,7 +220,7 @@ the first one among them gets selected. Here, both channels are constantly produ being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too. -The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the +The second observation, is that [onReceiveCatching][ReceiveChannel.onReceiveCatching] gets immediately selected when the channel is already closed. ## Selecting to send @@ -375,19 +380,19 @@ Deferred 4 produced answer 'Waited for 128 ms' Let us write a channel producer function that consumes a channel of deferred string values, waits for each received deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together -[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`: +[onReceiveCatching][ReceiveChannel.onReceiveCatching] and [onAwait][Deferred.onAwait] clauses in the same `select`: ```kotlin fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel>) = produce { var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select?> { // return next deferred value from this select or null - input.onReceiveOrNull { update -> - update // replaces next value to wait + input.onReceiveCatching { update -> + update.getOrNull() } - current.onAwait { value -> + current.onAwait { value -> send(value) // send value that current deferred has produced - input.receiveOrNull() // and use the next deferred from the input channel + input.receiveCatching().getOrNull() // and use the next deferred from the input channel } } if (next == null) { @@ -423,12 +428,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel>) = var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select?> { // return next deferred value from this select or null - input.onReceiveOrNull { update -> - update // replaces next value to wait + input.onReceiveCatching { update -> + update.getOrNull() } - current.onAwait { value -> + current.onAwait { value -> send(value) // send value that current deferred has produced - input.receiveOrNull() // and use the next deferred from the input channel + input.receiveCatching().getOrNull() // and use the next deferred from the input channel } } if (next == null) { @@ -491,7 +496,7 @@ Channel was closed [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html [ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html -[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html +[ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html [SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html @@ -499,4 +504,4 @@ Channel was closed [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html - \ No newline at end of file + diff --git a/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt b/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt index 11ceb1a831..d35ee72de0 100644 --- a/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/time/FlowSampleTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.time @@ -12,8 +12,7 @@ import org.junit.Test import java.time.Duration import kotlin.test.assertEquals - -class SampleTest : TestBase() { +class FlowSampleTest : TestBase() { @Test public fun testBasic() = withVirtualTime { expect(1) diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md index bc5587623a..9fdf418233 100644 --- a/kotlinx-coroutines-core/README.md +++ b/kotlinx-coroutines-core/README.md @@ -54,9 +54,9 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | ---------------- | --------------------------------------------- | ------------------------------------------------ | -------------------------- | [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted] | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] -| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] +| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive] | [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none @@ -133,11 +133,11 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout [kotlinx.coroutines.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html [kotlinx.coroutines.channels.SendChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/index.html [kotlinx.coroutines.channels.SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html -[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html +[kotlinx.coroutines.channels.SendChannel.trySend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/try-send.html [kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html -[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html -[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html -[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html +[kotlinx.coroutines.channels.ReceiveChannel.tryReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/try-receive.html +[kotlinx.coroutines.channels.ReceiveChannel.receiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-catching.html +[kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 062e466e04..2f2b8d54ab 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -555,7 +555,9 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V + public static fun getOnReceiveOrNull (Lkotlinx/coroutines/channels/ActorScope;)Lkotlinx/coroutines/selects/SelectClause1; public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object; + public static fun receiveOrNull (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel { @@ -600,8 +602,10 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co public final class kotlinx/coroutines/channels/Channel$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V + public static fun getOnReceiveOrNull (Lkotlinx/coroutines/channels/Channel;)Lkotlinx/coroutines/selects/SelectClause1; public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object; + public static fun receiveOrNull (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/channels/Channel$Factory { @@ -627,6 +631,9 @@ public final class kotlinx/coroutines/channels/ChannelKt { public static final fun Channel (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel; public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; + public static final fun getOrElse-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; + public static final fun onFailure-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; + public static final fun onSuccess-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; } public final class kotlinx/coroutines/channels/ChannelResult { @@ -840,7 +847,9 @@ public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls { public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V + public static fun getOnReceiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/selects/SelectClause1; public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object; + public static fun receiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public abstract interface class kotlinx/coroutines/channels/SendChannel { diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md index 6712648ae8..e8503d0d16 100644 --- a/kotlinx-coroutines-core/common/README.md +++ b/kotlinx-coroutines-core/common/README.md @@ -59,7 +59,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] | [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none @@ -149,8 +149,8 @@ Low-level primitives for finer-grained control of coroutines. [kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html [kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html [kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html -[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html -[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html +[kotlinx.coroutines.channels.ReceiveChannel.receiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-catching.html +[kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 82143b03a1..04a9d1a64a 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -623,24 +623,6 @@ internal abstract class AbstractChannel( if (result) onReceiveEnqueued() } - public final override suspend fun receiveOrNull(): E? { - // fast path -- try poll non-blocking - val result = pollInternal() - @Suppress("UNCHECKED_CAST") - if (result !== POLL_FAILED && result !is Closed<*>) return result as E - // slow-path does suspend - return receiveSuspend(RECEIVE_NULL_ON_CLOSE) - } - - @Suppress("UNCHECKED_CAST") - private fun receiveOrNullResult(result: Any?): E? { - if (result is Closed<*>) { - if (result.closeCause != null) throw recoverStackTrace(result.closeCause) - return null - } - return result as E - } - @Suppress("UNCHECKED_CAST") public final override suspend fun receiveCatching(): ChannelResult { // fast path -- try poll non-blocking @@ -755,14 +737,6 @@ internal abstract class AbstractChannel( } } - final override val onReceiveOrNull: SelectClause1 - get() = object : SelectClause1 { - @Suppress("UNCHECKED_CAST") - override fun registerSelectClause1(select: SelectInstance, block: suspend (E?) -> R) { - registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R) - } - } - final override val onReceiveCatching: SelectClause1> get() = object : SelectClause1> { @Suppress("UNCHECKED_CAST") @@ -799,14 +773,6 @@ internal abstract class AbstractChannel( if (!select.trySelect()) return startCoroutineUnintercepted(ChannelResult.closed(value.closeCause), select.completion) } - RECEIVE_NULL_ON_CLOSE -> { - if (value.closeCause == null) { - if (!select.trySelect()) return - startCoroutineUnintercepted(null, select.completion) - } else { - throw recoverStackTrace(value.receiveException) - } - } } } else -> { @@ -942,7 +908,6 @@ internal abstract class AbstractChannel( override fun resumeReceiveClosed(closed: Closed<*>) { when { - receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult()) else -> cont.resumeWithException(closed.receiveException) } @@ -1022,11 +987,6 @@ internal abstract class AbstractChannel( when (receiveMode) { RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException) RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed(closed.closeCause), select.completion) - RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) { - block.startCoroutineCancellable(null, select.completion) - } else { - select.resumeSelectWithException(closed.receiveException) - } } } @@ -1044,8 +1004,7 @@ internal abstract class AbstractChannel( // receiveMode values internal const val RECEIVE_THROWS_ON_CLOSE = 0 -internal const val RECEIVE_NULL_ON_CLOSE = 1 -internal const val RECEIVE_RESULT = 2 +internal const val RECEIVE_RESULT = 1 @JvmField @SharedImmutable diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 61d06d83d2..8aad264d2c 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.internal.* import kotlinx.coroutines.selects.* +import kotlin.contracts.* import kotlin.internal.* import kotlin.jvm.* @@ -210,52 +211,49 @@ public interface ReceiveChannel { public val onReceive: SelectClause1 /** - * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty, - * or returns `null` if the channel is [closed for `receive`][isClosedForReceive] without cause, - * or throws the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this - * function is suspended, this function immediately resumes with a [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. The `receiveOrNull` call can retrieve the element from the channel, - * but then throw [CancellationException], thus failing to deliver the element. - * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements. - * - * Note that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. * - * This function can be used in [select] invocations with the [onReceiveOrNull] clause. - * Use [poll] to try receiving from this channel without waiting. + * It had the following pitfalls: + * - Didn't allow to distinguish 'null' as "closed channel" from "null as a value" + * - Was throwing if the channel has failed even though its signature may suggest it returns 'null' + * - It didn't really belong to core channel API and can be exposed as an extension instead. * - * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension. + * @suppress doc */ - @ObsoleteCoroutinesApi @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") @LowPriorityInOverloadResolution @Deprecated( - message = "Deprecated in favor of receiveCatching and receiveOrNull extension", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull") - ) - public suspend fun receiveOrNull(): E? + message = "Deprecated in favor of receiveCatching", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("receiveCatching().getOrNull()") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden in 1.6.0 + public suspend fun receiveOrNull(): E? = receiveCatching().getOrNull() /** - * Clause for the [select] expression of the [receiveOrNull] suspending function that selects with the element - * received from the channel or `null` if the channel is - * [closed for `receive`][isClosedForReceive] without a cause. The [select] invocation fails with - * the original [close][SendChannel.close] cause exception if the channel has _failed_. + * This function was deprecated since 1.3.0 and is no longer recommended to use + * or to implement in subclasses. + * See [receiveOrNull] documentation. * - * @suppress **Deprecated**: in favor of receiveCatching and onReceiveOrNull extension. + * @suppress **Deprecated**: in favor of onReceiveCatching extension. */ - @ObsoleteCoroutinesApi - @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") - @LowPriorityInOverloadResolution @Deprecated( - message = "Deprecated in favor of receiveCatching and onReceiveOrNull extension", - level = DeprecationLevel.WARNING, - replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull") - ) + message = "Deprecated in favor of onReceiveCatching extension", + level = DeprecationLevel.ERROR, + replaceWith = ReplaceWith("onReceiveCatching") + ) // Warning since 1.3.0, error in 1.5.0, will be hidden or removed in 1.6.0 public val onReceiveOrNull: SelectClause1 + get() { + return object : SelectClause1 { + @InternalCoroutinesApi + override fun registerSelectClause1(select: SelectInstance, block: suspend (E?) -> R) { + onReceiveCatching.registerSelectClause1(select) { + it.exceptionOrNull()?.let { throw it } + block(it.getOrNull()) + } + } + } + } /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. @@ -354,7 +352,7 @@ public interface ReceiveChannel { */ @Suppress("UNCHECKED_CAST") public inline class ChannelResult -@PublishedApi internal constructor(private val holder: Any?) { +@PublishedApi internal constructor(@PublishedApi internal val holder: Any?) { /** * Returns `true` if this instance represents a successful * operation outcome. @@ -440,6 +438,50 @@ public inline class ChannelResult } } +/** + * Returns the encapsulated value if this instance represents [success][ChannelResult.isSuccess] or the + * result of [onFailure] function for the encapsulated [Throwable] exception if it is failed or closed + * result. + */ +@OptIn(ExperimentalContracts::class) +public inline fun ChannelResult.getOrElse(onFailure: (exception: Throwable?) -> T): T { + contract { + callsInPlace(onFailure, InvocationKind.AT_MOST_ONCE) + } + @Suppress("UNCHECKED_CAST") + return if (holder is ChannelResult.Failed) onFailure(exceptionOrNull()) else holder as T +} + +/** + * Performs the given [action] on the encapsulated value if this instance represents [success][ChannelResult.isSuccess]. + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun ChannelResult.onSuccess(action: (value: T) -> Unit): ChannelResult { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) + } + @Suppress("UNCHECKED_CAST") + if (holder !is ChannelResult.Failed) action(holder as T) + return this +} + +/** + * Performs the given [action] on the encapsulated [Throwable] exception if this instance represents [failure][ChannelResult.isFailure]. + * The result of [ChannelResult.exceptionOrNull] is passed to the [action] parameter. + * + * Returns the original `ChannelResult` unchanged. + */ +@OptIn(ExperimentalContracts::class) +public inline fun ChannelResult.onFailure(action: (exception: Throwable?) -> Unit): ChannelResult { + contract { + callsInPlace(action, InvocationKind.AT_MOST_ONCE) + } + @Suppress("UNCHECKED_CAST") + if (holder is ChannelResult.Failed) action(exceptionOrNull()) + return this +} + /** * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used * from concurrent coroutines. diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index e3567e3107..6bf6f88123 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -37,40 +37,34 @@ public inline fun BroadcastChannel.consume(block: ReceiveChannel.() } /** - * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty] - * or returns `null` if the channel is [closed][Channel.isClosedForReceive]. + * This function is deprecated in the favour of [ReceiveChannel.receiveCatching]. * - * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this - * function is suspended, this function immediately resumes with [CancellationException]. - * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was - * suspended, it will not resume successfully. If the `receiveOrNull` call threw [CancellationException] there is no way - * to tell if some element was already received from the channel or not. See [Channel] documentation for details. + * This function is considered error-prone for the following reasons; + * * Is throwing if the channel has failed even though its signature may suggest it returns 'null' + * * It is easy to forget that exception handling still have to be explicit + * * During code reviews and code reading, intentions of the code are frequently unclear: + * are potential exceptions ignored deliberately or not? * - * Note, that this function does not check for cancellation when it is not suspended. - * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. - * - * This extension is defined only for channels on non-null types, so that generic functions defined using - * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard - * to find bugs. + * @suppress doc */ +@Deprecated( + "Deprecated in the favour of 'receiveCatching'", + ReplaceWith("receiveCatching().getOrNull()"), + DeprecationLevel.WARNING +) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 @Suppress("EXTENSION_SHADOWED_BY_MEMBER") -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x public suspend fun ReceiveChannel.receiveOrNull(): E? { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel).receiveOrNull() } /** - * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that - * is received from the channel or selects with `null` if the channel - * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with - * the original [close][SendChannel.close] cause exception if the channel has _failed_. - * - * This extension is defined only for channels on non-null types, so that generic functions defined using - * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard - * to find bugs. - **/ -@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.x + * This function is deprecated in the favour of [ReceiveChannel.onReceiveCatching] + */ +@Deprecated( + "Deprecated in the favour of 'onReceiveCatching'", + level = DeprecationLevel.WARNING +) // Warning since 1.5.0, ERROR in 1.6.0, HIDDEN in 1.7.0 public fun ReceiveChannel.onReceiveOrNull(): SelectClause1 { @Suppress("DEPRECATION", "UNCHECKED_CAST") return (this as ReceiveChannel).onReceiveOrNull diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt index 7fde06362a..6e5f3f11aa 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/Combine.kt @@ -54,7 +54,7 @@ internal suspend fun FlowCollector.combineInternal( ++currentEpoch // Start batch // The very first receive in epoch should be suspending - var element = resultChannel.receiveOrNull() ?: break // Channel is closed, nothing to do here + var element = resultChannel.receiveCatching().getOrNull() ?: break // Channel is closed, nothing to do here while (true) { val index = element.index // Update values @@ -129,7 +129,9 @@ internal fun zipImpl(flow: Flow, flow2: Flow, transform: sus withContextUndispatched(coroutineContext + collectJob, Unit) { flow.collect { value -> withContextUndispatched(scopeContext, Unit, cnt) { - val otherValue = second.receiveOrNull() ?: throw AbortFlowException(this@unsafeFlow) + val otherValue = second.receiveCatching().getOrElse { + throw it ?:AbortFlowException(this@unsafeFlow) + } emit(transform(value, NULL.unbox(otherValue))) } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 0f6ee3aca8..fed5962bd5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -209,8 +209,7 @@ public fun Flow.debounce(timeout: (T) -> Duration): Flow = private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel - // Note: the actual type is Any, KT-30796 - val values = produce { + val values = produce { collect { value -> send(value ?: NULL) } } // Now consume the values @@ -237,14 +236,15 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long) : F lastValue = null // Consume the value } } - // Should be receiveCatching when boxing issues are fixed - values.onReceiveOrNull { value -> - if (value == null) { - if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) - lastValue = DONE - } else { - lastValue = value - } + values.onReceiveCatching { value -> + value + .onSuccess { lastValue = it } + .onFailure { + it?.let { throw it } + // If closed normally, emit the latest value + if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) + lastValue = DONE + } } } } @@ -278,21 +278,21 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long) : F public fun Flow.sample(periodMillis: Long): Flow { require(periodMillis > 0) { "Sample period should be positive" } return scopedFlow { downstream -> - val values = produce(capacity = Channel.CONFLATED) { - // Actually Any, KT-30796 + val values = produce(capacity = Channel.CONFLATED) { collect { value -> send(value ?: NULL) } } var lastValue: Any? = null val ticker = fixedPeriodTicker(periodMillis) while (lastValue !== DONE) { select { - values.onReceiveOrNull { - if (it == null) { - ticker.cancel(ChildCancelledException()) - lastValue = DONE - } else { - lastValue = it - } + values.onReceiveCatching { result -> + result + .onSuccess { lastValue = it } + .onFailure { + it?.let { throw it } + ticker.cancel(ChildCancelledException()) + lastValue = DONE + } } // todo: shall be start sampling only when an element arrives or sample aways as here? diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 0d97400717..a7172707e2 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -177,15 +177,17 @@ public interface SelectInstance { * corresponding non-suspending version that can be used with a regular `when` expression to select one * of the alternatives or to perform the default (`else`) action if none of them can be immediately selected. * - * | **Receiver** | **Suspending function** | **Select clause** | **Non-suspending version** - * | ---------------- | --------------------------------------------- | ------------------------------------------------ | -------------------------- - * | [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted] - * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] - * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] | [offer][SendChannel.offer] - * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] | [poll][ReceiveChannel.poll] - * | [ReceiveChannel] | [receiveOrNull][ReceiveChannel.receiveOrNull] | [onReceiveOrNull][ReceiveChannel.onReceiveOrNull]| [poll][ReceiveChannel.poll] - * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] | [tryLock][Mutex.tryLock] - * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] | none + * ### List of supported select methods + * + * | **Receiver** | **Suspending function** | **Select clause** + * | ---------------- | --------------------------------------------- | ----------------------------------------------------- + * | [Job] | [join][Job.join] | [onJoin][Job.onJoin] + * | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] + * | [SendChannel] | [send][SendChannel.send] | [onSend][SendChannel.onSend] + * | [ReceiveChannel] | [receive][ReceiveChannel.receive] | [onReceive][ReceiveChannel.onReceive] + * | [ReceiveChannel] | [receiveCatching][ReceiveChannel.receiveCatching] | [onReceiveCatching][ReceiveChannel.onReceiveCatching] + * | [Mutex] | [lock][Mutex.lock] | [onLock][Mutex.onLock] + * | none | [delay] | [onTimeout][SelectBuilder.onTimeout] * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with [CancellationException]. diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt index a7084296bb..2d71cc94ed 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -46,7 +46,7 @@ class ArrayBroadcastChannelTest : TestBase() { assertEquals(2, first.receive()) // suspends assertFalse(first.isClosedForReceive) expect(10) - assertNull(first.receiveOrNull()) // suspends + assertTrue(first.receiveCatching().isClosed) // suspends assertTrue(first.isClosedForReceive) expect(14) } @@ -62,7 +62,7 @@ class ArrayBroadcastChannelTest : TestBase() { assertEquals(2, second.receive()) // suspends assertFalse(second.isClosedForReceive) expect(11) - assertNull(second.receiveOrNull()) // suspends + assertNull(second.receiveCatching().getOrNull()) // suspends assertTrue(second.isClosedForReceive) expect(15) } @@ -116,9 +116,9 @@ class ArrayBroadcastChannelTest : TestBase() { expect(6) assertFalse(sub.isClosedForReceive) for (x in 1..3) - assertEquals(x, sub.receiveOrNull()) + assertEquals(x, sub.receiveCatching().getOrNull()) // and receive close signal - assertNull(sub.receiveOrNull()) + assertNull(sub.receiveCatching().getOrNull()) assertTrue(sub.isClosedForReceive) finish(7) } @@ -153,7 +153,7 @@ class ArrayBroadcastChannelTest : TestBase() { // make sure all of them are consumed check(!sub.isClosedForReceive) for (x in 1..5) check(sub.receive() == x) - check(sub.receiveOrNull() == null) + check(sub.receiveCatching().getOrNull() == null) check(sub.isClosedForReceive) } @@ -196,7 +196,7 @@ class ArrayBroadcastChannelTest : TestBase() { val channel = BroadcastChannel(1) val subscription = channel.openSubscription() subscription.cancel(TestCancellationException()) - subscription.receiveOrNull() + subscription.receive() } @Test @@ -208,6 +208,6 @@ class ArrayBroadcastChannelTest : TestBase() { channel.cancel() assertTrue(channel.isClosedForSend) assertTrue(sub.isClosedForReceive) - check(sub.receiveOrNull() == null) + check(sub.receiveCatching().getOrNull() == null) } } diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index a57b519f61..3900c2db90 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -38,17 +38,17 @@ class ArrayChannelTest : TestBase() { } @Test - fun testClosedBufferedReceiveOrNull() = runTest { + fun testClosedBufferedReceiveCatching() = runTest { val q = Channel(1) check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive) expect(1) launch { expect(5) check(!q.isEmpty && q.isClosedForSend && !q.isClosedForReceive) - assertEquals(42, q.receiveOrNull()) + assertEquals(42, q.receiveCatching().getOrNull()) expect(6) check(!q.isEmpty && q.isClosedForSend && q.isClosedForReceive) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) expect(7) } expect(2) @@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith { q.receiveOrNull() } + assertFailsWith { q.receiveCatching().getOrThrow() } finish(12) } @@ -142,7 +142,7 @@ class ArrayChannelTest : TestBase() { fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = Channel(5) channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receive() } @Test @@ -160,7 +160,7 @@ class ArrayChannelTest : TestBase() { channel.offer(-1) } repeat(4) { - channel.receiveOrNull() + channel.receiveCatching().getOrNull() } checkBufferChannel(channel, capacity) } diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index f1658cfa84..a64284aec1 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -24,19 +24,9 @@ class BasicOperationsTest : TestBase() { TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) } } - @Test - fun testReceiveOrNullAfterClose() = runTest { - TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) } - } - - @Test - fun testReceiveOrNullAfterCloseWithException() = runTest { - TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) } - } - @Test fun testReceiveCatching() = runTest { - TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) } + TestChannelKind.values().forEach { kind -> testReceiveCatching(kind) } } @Test @@ -90,24 +80,7 @@ class BasicOperationsTest : TestBase() { } } - private suspend fun testReceiveOrNull(kind: TestChannelKind) = coroutineScope { - val channel = kind.create() - val d = async(NonCancellable) { - channel.receive() - } - - yield() - channel.close() - assertTrue(channel.isClosedForReceive) - - assertNull(channel.receiveOrNull()) - assertNull(channel.poll()) - - d.join() - assertTrue(d.getCancellationException().cause is ClosedReceiveChannelException) - } - - private suspend fun testReceiveOrNullException(kind: TestChannelKind) = coroutineScope { + private suspend fun testReceiveCatchingException(kind: TestChannelKind) = coroutineScope { val channel = kind.create() val d = async(NonCancellable) { channel.receive() @@ -119,8 +92,8 @@ class BasicOperationsTest : TestBase() { assertFailsWith { channel.poll() } try { - channel.receiveOrNull() - fail() + channel.receiveCatching().getOrThrow() + expectUnreached() } catch (e: TestException) { // Expected } @@ -130,7 +103,7 @@ class BasicOperationsTest : TestBase() { } @Suppress("ReplaceAssertBooleanWithAssertEquality") - private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope { + private suspend fun testReceiveCatching(kind: TestChannelKind) = coroutineScope { reset() val channel = kind.create() launch { diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt index 37db7e4526..ae05fb8d74 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt @@ -70,33 +70,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() { } @Test - fun testReceiveOrNullCancelledFail() = runTest(unhandled = shouldBeUnhandled) { - val channel = Channel(onUndeliveredElement = onCancelFail) - val job = launch(start = CoroutineStart.UNDISPATCHED) { - channel.receiveOrNull() - expectUnreached() // will be cancelled before it dispatches - } - channel.send(item) - job.cancel() - } - - @Test - fun testReceiveOrNullSelectCancelledFail() = runTest(unhandled = shouldBeUnhandled) { - val channel = Channel(onUndeliveredElement = onCancelFail) - val job = launch(start = CoroutineStart.UNDISPATCHED) { - select { - channel.onReceiveOrNull { - expectUnreached() - } - } - expectUnreached() // will be cancelled before it dispatches - } - channel.send(item) - job.cancel() - } - - @Test - fun testReceiveOrClosedCancelledFail() = runTest(unhandled = shouldBeUnhandled) { + fun testReceiveCatchingCancelledFail() = runTest(unhandled = shouldBeUnhandled) { val channel = Channel(onUndeliveredElement = onCancelFail) val job = launch(start = CoroutineStart.UNDISPATCHED) { channel.receiveCatching() diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt index 7dd232f2d7..856a66fbd2 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedBroadcastChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -68,7 +68,7 @@ class ConflatedBroadcastChannelTest : TestBase() { expect(14) assertEquals("three", sub.receive()) // suspends expect(17) - assertNull(sub.receiveOrNull()) // suspends until closed + assertNull(sub.receiveCatching().getOrNull()) // suspends until closed expect(20) sub.cancel() expect(21) diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt index 18f2843868..87194f72f9 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -27,7 +27,7 @@ open class ConflatedChannelTest : TestBase() { val q = createConflatedChannel() q.send(1) q.send(2) // shall conflated previously sent - assertEquals(2, q.receiveOrNull()) + assertEquals(2, q.receiveCatching().getOrNull()) } @Test @@ -41,7 +41,7 @@ open class ConflatedChannelTest : TestBase() { // not it is closed for receive, too assertTrue(q.isClosedForSend) assertTrue(q.isClosedForReceive) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) } @Test @@ -82,7 +82,7 @@ open class ConflatedChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith { q.receiveOrNull() } + assertFailsWith { q.receiveCatching().getOrThrow() } finish(2) } @@ -90,6 +90,6 @@ open class ConflatedChannelTest : TestBase() { fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = createConflatedChannel() channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receive() } } diff --git a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt index 4233a35084..cdec8a776e 100644 --- a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -18,8 +18,8 @@ class LinkedListChannelTest : TestBase() { check(!c.close()) assertEquals(1, c.receive()) assertEquals(2, c.poll()) - assertEquals(3, c.receiveOrNull()) - assertNull(c.receiveOrNull()) + assertEquals(3, c.receiveCatching().getOrNull()) + assertNull(c.receiveCatching().getOrNull()) } @Test @@ -31,13 +31,13 @@ class LinkedListChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith { q.receiveOrNull() } + assertFailsWith { q.receive() } } @Test fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = Channel(Channel.UNLIMITED) channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receive() } } diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 194504e713..61ef072622 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -24,7 +24,7 @@ class ProduceTest : TestBase() { expect(4) check(c.receive() == 2) expect(5) - check(c.receiveOrNull() == null) + assertNull(c.receiveCatching().getOrNull()) finish(7) } @@ -49,7 +49,7 @@ class ProduceTest : TestBase() { expect(4) c.cancel() expect(5) - assertFailsWith { c.receiveOrNull() } + assertFailsWith { c.receiveCatching().getOrThrow() } expect(6) yield() // to produce finish(8) @@ -76,7 +76,7 @@ class ProduceTest : TestBase() { expect(4) c.cancel(TestCancellationException()) try { - assertNull(c.receiveOrNull()) + c.receive() expectUnreached() } catch (e: TestCancellationException) { expect(5) diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt index 4d20d71596..ab0292a831 100644 --- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -36,15 +36,15 @@ class RendezvousChannelTest : TestBase() { } @Test - fun testClosedReceiveOrNull() = runTest { + fun testClosedReceiveCatching() = runTest { val q = Channel(Channel.RENDEZVOUS) check(q.isEmpty && !q.isClosedForSend && !q.isClosedForReceive) expect(1) launch { expect(3) - assertEquals(42, q.receiveOrNull()) + assertEquals(42, q.receiveCatching().getOrNull()) expect(4) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) expect(6) } expect(2) @@ -233,9 +233,9 @@ class RendezvousChannelTest : TestBase() { expect(7) yield() // try to resume sender (it will not resume despite the close!) expect(8) - assertEquals(42, q.receiveOrNull()) + assertEquals(42, q.receiveCatching().getOrNull()) expect(9) - assertNull(q.receiveOrNull()) + assertNull(q.receiveCatching().getOrNull()) expect(10) yield() // to sender, it was resumed! finish(12) @@ -266,7 +266,7 @@ class RendezvousChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - assertFailsWith { q.receiveOrNull() } + assertFailsWith { q.receiveCatching().getOrThrow() } finish(12) } @@ -274,6 +274,6 @@ class RendezvousChannelTest : TestBase() { fun testCancelWithCause() = runTest({ it is TestCancellationException }) { val channel = Channel(Channel.RENDEZVOUS) channel.cancel(TestCancellationException()) - channel.receiveOrNull() + channel.receiveCatching().getOrThrow() } } diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index 3e70007a6e..f234e141fe 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -42,7 +42,6 @@ private class ChannelViaBroadcast( override val isEmpty: Boolean get() = sub.isEmpty override suspend fun receive(): E = sub.receive() - override suspend fun receiveOrNull(): E? = sub.receiveOrNull() override suspend fun receiveCatching(): ChannelResult = sub.receiveCatching() override fun iterator(): ChannelIterator = sub.iterator() override fun tryReceive(): ChannelResult = sub.tryReceive() @@ -55,8 +54,6 @@ private class ChannelViaBroadcast( override val onReceive: SelectClause1 get() = sub.onReceive - override val onReceiveOrNull: SelectClause1 - get() = sub.onReceiveOrNull override val onReceiveCatching: SelectClause1> get() = sub.onReceiveCatching } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt index e31ccfc16d..ba8f56ad4c 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectLoopTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 @@ -27,7 +27,7 @@ class SelectLoopTest : TestBase() { try { while (true) { select { - channel.onReceiveOrNull { + channel.onReceiveCatching { expectUnreached() } job.onJoin { @@ -40,4 +40,4 @@ class SelectLoopTest : TestBase() { finish(4) } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt deleted file mode 100644 index ac8f5f4ee6..0000000000 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testReceiveOrNullFromClosedChannel.txt +++ /dev/null @@ -1,8 +0,0 @@ -kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43) - (Coroutine boundary) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest.channelReceiveOrNull(StackTraceRecoveryChannelsTest.kt:70) - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:44) -Caused by: kotlinx.coroutines.RecoverableTestException - at kotlinx.coroutines.exceptions.StackTraceRecoveryChannelsTest$testReceiveOrNullFromClosedChannel$1.invokeSuspend(StackTraceRecoveryChannelsTest.kt:43) - at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt index 1d7613eded..5a2778d507 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt @@ -69,11 +69,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithoutCause() = runTest { val actor = actor(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) assertEquals(42, element) - val next = channel.receiveOrNull() - assertNull(next) + val next = channel.receiveCatching() + assertNull(next.exceptionOrNull()) expect(3) } @@ -88,11 +88,11 @@ class ActorTest(private val capacity: Int) : TestBase() { @Test fun testCloseWithCause() = runTest { val actor = actor(capacity = capacity) { - val element = channel.receiveOrNull() + val element = channel.receive() expect(2) - require(element!! == 42) + require(element == 42) try { - channel.receiveOrNull() + channel.receive() } catch (e: IOException) { expect(3) } @@ -111,7 +111,7 @@ class ActorTest(private val capacity: Int) : TestBase() { val job = async { actor(capacity = capacity) { expect(1) - channel.receiveOrNull() + channel.receive() expectUnreached() } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt index 2e73b2432a..8c9777b4af 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -67,10 +67,10 @@ class BroadcastChannelMultiReceiveStressTest( val channel = broadcast.openSubscription() when (receiverIndex % 5) { 0 -> doReceive(channel, receiverIndex) - 1 -> doReceiveOrNull(channel, receiverIndex) + 1 -> doReceiveCatching(channel, receiverIndex) 2 -> doIterator(channel, receiverIndex) 3 -> doReceiveSelect(channel, receiverIndex) - 4 -> doReceiveSelectOrNull(channel, receiverIndex) + 4 -> doReceiveCatchingSelect(channel, receiverIndex) } channel.cancel() } @@ -124,9 +124,9 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveOrNull(channel: ReceiveChannel, receiverIndex: Int) { + private suspend fun doReceiveCatching(channel: ReceiveChannel, receiverIndex: Int) { while (true) { - val stop = doReceived(receiverIndex, channel.receiveOrNull() ?: break) + val stop = doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) if (stop) break } } @@ -148,11 +148,11 @@ class BroadcastChannelMultiReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(channel: ReceiveChannel, receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(channel: ReceiveChannel, receiverIndex: Int) { while (true) { - val event = select { channel.onReceiveOrNull { it } } ?: break + val event = select { channel.onReceiveCatching { it.getOrNull() } } ?: break val stop = doReceived(receiverIndex, event) if (stop) break } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt index 9f7ce497fa..a6a5340389 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelCancelUndeliveredElementStressTest.kt @@ -89,7 +89,7 @@ class ChannelCancelUndeliveredElementStressTest : TestBase() { private suspend fun receiveOne(channel: Channel) { val received = when (Random.nextInt(3)) { 0 -> channel.receive() - 1 -> channel.receiveOrNull() ?: error("Cannot be closed yet") + 1 -> channel.receiveCatching().getOrElse { error("Cannot be closed yet") } 2 -> select { channel.onReceive { it } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt index f414c33338..a6345cc55b 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelSendReceiveStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -60,10 +60,10 @@ class ChannelSendReceiveStressTest( launch(pool + CoroutineName("receiver$receiverIndex")) { when (receiverIndex % 5) { 0 -> doReceive(receiverIndex) - 1 -> doReceiveOrNull(receiverIndex) + 1 -> doReceiveCatching(receiverIndex) 2 -> doIterator(receiverIndex) 3 -> doReceiveSelect(receiverIndex) - 4 -> doReceiveSelectOrNull(receiverIndex) + 4 -> doReceiveCatchingSelect(receiverIndex) } receiversCompleted.incrementAndGet() } @@ -152,9 +152,9 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatching(receiverIndex: Int) { while (true) { - doReceived(receiverIndex, channel.receiveOrNull() ?: break) + doReceived(receiverIndex, channel.receiveCatching().getOrNull() ?: break) } } @@ -173,10 +173,10 @@ class ChannelSendReceiveStressTest( } } - private suspend fun doReceiveSelectOrNull(receiverIndex: Int) { + private suspend fun doReceiveCatchingSelect(receiverIndex: Int) { while (true) { - val event = select { channel.onReceiveOrNull { it } } ?: break + val event = select { channel.onReceiveCatching { it.getOrNull() } } ?: break doReceived(receiverIndex, event) } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt index 3f502ba9fb..8f5224db79 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt @@ -188,8 +188,8 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T val receivedData = when (receiveMode) { 1 -> channel.receive() 2 -> select { channel.onReceive { it } } - 3 -> channel.receiveOrNull() ?: error("Should not be closed") - 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } } + 3 -> channel.receiveCatching().getOrElse { error("Should not be closed") } + 4 -> select { channel.onReceiveCatching { it.getOrElse { error("Should not be closed") } } } 5 -> channel.receiveCatching().getOrThrow() 6 -> { val iterator = channel.iterator() diff --git a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt index 316b378508..fd26144faf 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ConflatedChannelCloseStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -64,7 +64,9 @@ class ConflatedChannelCloseStressTest : TestBase() { } val receiver = async(pool + NonCancellable) { while (isActive) { - curChannel.get().receiveOrNull() + curChannel.get().receiveCatching().getOrElse { + it?.let { throw it } + } received.incrementAndGet() } } @@ -110,4 +112,4 @@ class ConflatedChannelCloseStressTest : TestBase() { } class StopException : Exception() -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt index f52f8b5bcf..555d99fc94 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/StackTraceRecoveryChannelsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -37,13 +37,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { channelReceive(channel) } - @Test - fun testReceiveOrNullFromClosedChannel() = runTest { - val channel = Channel() - channel.close(RecoverableTestException()) - channelReceiveOrNull(channel) - } - @Test fun testSendToClosedChannel() = runTest { val channel = Channel() @@ -67,7 +60,6 @@ class StackTraceRecoveryChannelsTest : TestBase() { } private suspend fun channelReceive(channel: Channel) = channelOp { channel.receive() } - private suspend fun channelReceiveOrNull(channel: Channel) = channelOp { channel.receiveOrNull() } private suspend inline fun channelOp(block: () -> Unit) { try { @@ -75,6 +67,7 @@ class StackTraceRecoveryChannelsTest : TestBase() { block() expectUnreached() } catch (e: RecoverableTestException) { + e.printStackTrace() verifyStackTrace("channels/${name.methodName}", e) } } @@ -177,4 +170,4 @@ class StackTraceRecoveryChannelsTest : TestBase() { private suspend fun Channel.sendFromScope() = coroutineScope { sendWithContext(wrapperDispatcher(coroutineContext)) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt index 57fe638297..22380d3af5 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-02.kt @@ -11,17 +11,21 @@ import kotlinx.coroutines.selects.* suspend fun selectAorB(a: ReceiveChannel, b: ReceiveChannel): String = select { - a.onReceiveOrNull { value -> - if (value == null) - "Channel 'a' is closed" - else + a.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "a -> '$value'" + } else { + "Channel 'a' is closed" + } } - b.onReceiveOrNull { value -> - if (value == null) - "Channel 'b' is closed" - else + b.onReceiveCatching { it -> + val value = it.getOrNull() + if (value != null) { "b -> '$value'" + } else { + "Channel 'b' is closed" + } } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt index 464e9b20f3..68b4456454 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-select-05.kt @@ -13,12 +13,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel>) = var current = input.receive() // start with first received deferred value while (isActive) { // loop while not cancelled/closed val next = select?> { // return next deferred value from this select or null - input.onReceiveOrNull { update -> - update // replaces next value to wait + input.onReceiveCatching { update -> + update.getOrNull() } - current.onAwait { value -> + current.onAwait { value -> send(value) // send value that current deferred has produced - input.receiveOrNull() // and use the next deferred from the input channel + input.receiveCatching().getOrNull() // and use the next deferred from the input channel } } if (next == null) { diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 1f04ea538c..cb8de7a636 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -112,7 +112,7 @@ private class PublisherAsFlow( collectImpl(scope.coroutineContext, SendingCollector(scope.channel)) } -@Suppress("SubscriberImplementation") +@Suppress("ReactiveStreamsSubscriberImplementation") private class ReactiveSubscriber( capacity: Int, onBufferOverflow: BufferOverflow, @@ -124,7 +124,11 @@ private class ReactiveSubscriber( // be reliable with rendezvous channel, so a rendezvous channel is replaced with buffer=1 channel private val channel = Channel(if (capacity == Channel.RENDEZVOUS) 1 else capacity, onBufferOverflow) - suspend fun takeNextOrNull(): T? = channel.receiveOrNull() + suspend fun takeNextOrNull(): T? { + val result = channel.receiveCatching() + result.exceptionOrNull()?.let { throw it } + return result.getOrElse { null } // Closed channel + } override fun onNext(value: T) { // Controlled by requestSize @@ -247,7 +251,7 @@ public class FlowSubscription( if (old <= 0L) { assert(old == 0L) // Emitter is not started yet or has suspended -- spin on race with suspendCancellableCoroutine - while(true) { + while (true) { val producer = producer.getAndSet(null) ?: continue // spin if not set yet producer.resume(Unit) break diff --git a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt index f846882ab2..1db10b278d 100644 --- a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt @@ -9,7 +9,7 @@ import kotlinx.coroutines.flow.* import org.junit.* -class CancelledParentAttachTest : TestBase() { +class CancelledParentAttachTest : TestBase() {; @Test fun testFlow() = runTest { @@ -17,4 +17,5 @@ class CancelledParentAttachTest : TestBase() { val j = Job().also { it.cancel() } f.asPublisher(j).asFlow().collect() } + } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt index 04833e9814..e2c86c97ff 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherAsFlowTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive @@ -263,4 +263,14 @@ class PublisherAsFlowTest : TestBase() { } assertEquals(expected, list) } + + @Test + fun testException() = runTest { + expect(1) + val p = publish { throw TestException() }.asFlow() + p.catch { + assertTrue { it is TestException } + finish(2) + }.collect() + } } diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt index 110718ac55..790cf7ec7c 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherSubscriptionSelectTest.kt @@ -1,10 +1,11 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.reactive import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test import org.junit.runner.* @@ -31,23 +32,23 @@ class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() { val channelB = source.openSubscription(request) loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } } @@ -58,4 +59,4 @@ class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() { // should receive one of them fully assertTrue(a == n || b == n) } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt index 3cd3bbffff..396d19d159 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSubscriptionSelectTest.kt @@ -1,12 +1,14 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx2 import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test +import kotlin.onSuccess import kotlin.test.* class ObservableSubscriptionSelectTest : TestBase() { @@ -22,23 +24,23 @@ class ObservableSubscriptionSelectTest : TestBase() { val channelB = source.openSubscription() loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } } @@ -48,4 +50,4 @@ class ObservableSubscriptionSelectTest : TestBase() { // should receive one of them fully assertTrue(a == n || b == n) } -} \ No newline at end of file +} diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt index 2f04316159..58a54616f6 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSubscriptionSelectTest.kt @@ -1,12 +1,14 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.rx3 import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import kotlinx.coroutines.selects.* import org.junit.Test +import kotlin.onSuccess import kotlin.test.* class ObservableSubscriptionSelectTest : TestBase() { @@ -22,23 +24,23 @@ class ObservableSubscriptionSelectTest : TestBase() { val channelB = source.openSubscription() loop@ while (true) { val done: Int = select { - channelA.onReceiveOrNull { - if (it != null) assertEquals(a++, it) - if (it == null) 0 else 1 + channelA.onReceiveCatching { result -> + result.onSuccess { assertEquals(a++, it) } + if (result.isSuccess) 1 else 0 } - channelB.onReceiveOrNull { - if (it != null) assertEquals(b++, it) - if (it == null) 0 else 2 + channelB.onReceiveCatching { result -> + result.onSuccess { assertEquals(b++, it) } + if (result.isSuccess) 2 else 0 } } when (done) { 0 -> break@loop 1 -> { - val r = channelB.receiveOrNull() + val r = channelB.receiveCatching().getOrNull() if (r != null) assertEquals(b++, r) } 2 -> { - val r = channelA.receiveOrNull() + val r = channelA.receiveCatching().getOrNull() if (r != null) assertEquals(a++, r) } }