Skip to content

Commit

Permalink
Deprecation and migration of receiveOrNull and onReceiveOrNull. (#2612)
Browse files Browse the repository at this point in the history
* Deprecation and migration of receiveOrNull and onReceiveOrNull.

    * Raise deprecation level for members, introduce deprecation for extensions
    * Explain rationale behind deprecation
    * Provide default implementation for deprecated members in Channel interface
    * Get rid of the internal implementation, leverage receiveCatching
    * Introduce new extensions for ChannelResult and use them as a replacement in our own operators

Fixes #1676
  • Loading branch information
qwwdfsad authored Apr 6, 2021
1 parent 7ae273c commit 98532c9
Show file tree
Hide file tree
Showing 38 changed files with 339 additions and 372 deletions.
69 changes: 37 additions & 32 deletions docs/topics/select-expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,31 +120,32 @@ buzz -> 'Buzz!'
## Selecting on close

The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding
`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a
`select` to throw an exception. We can use [onReceiveCatching][ReceiveChannel.onReceiveCatching] clause to perform a
specific action when the channel is closed. The following example also shows that `select` is an expression that returns
the result of its selected clause:

```kotlin
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
a.onReceiveCatching { it ->
val value = it.getOrNull()
if (value != null) {
"a -> '$value'"
} else {
"Channel 'a' is closed"
}
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
b.onReceiveCatching { it ->
val value = it.getOrNull()
if (value != null) {
"b -> '$value'"
} else {
"Channel 'b' is closed"
}
}
}
```

Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only
for channels with non-nullable elements so that there is no accidental confusion between a closed channel
and a null value.

Let's use it with channel `a` that produces "Hello" string four times and
channel `b` that produces "World" four times:
Expand All @@ -158,17 +159,21 @@ import kotlinx.coroutines.selects.*

suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
a.onReceiveCatching { it ->
val value = it.getOrNull()
if (value != null) {
"a -> '$value'"
} else {
"Channel 'a' is closed"
}
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
b.onReceiveCatching { it ->
val value = it.getOrNull()
if (value != null) {
"b -> '$value'"
} else {
"Channel 'b' is closed"
}
}
}

Expand Down Expand Up @@ -215,7 +220,7 @@ the first one among them gets selected. Here, both channels are constantly produ
being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from
time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too.

The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the
The second observation, is that [onReceiveCatching][ReceiveChannel.onReceiveCatching] gets immediately selected when the
channel is already closed.

## Selecting to send
Expand Down Expand Up @@ -375,19 +380,19 @@ Deferred 4 produced answer 'Waited for 128 ms'

Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
[onReceiveCatching][ReceiveChannel.onReceiveCatching] and [onAwait][Deferred.onAwait] clauses in the same `select`:

```kotlin
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
input.onReceiveCatching { update ->
update.getOrNull()
}
current.onAwait { value ->
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
input.receiveCatching().getOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
Expand Down Expand Up @@ -423,12 +428,12 @@ fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) =
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
input.onReceiveCatching { update ->
update.getOrNull()
}
current.onAwait { value ->
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
input.receiveCatching().getOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
Expand Down Expand Up @@ -491,12 +496,12 @@ Channel was closed

[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
[ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html

<!--- INDEX kotlinx.coroutines.selects -->

[select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html

<!--- END -->
<!--- END -->
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.time
Expand All @@ -12,8 +12,7 @@ import org.junit.Test
import java.time.Duration
import kotlin.test.assertEquals


class SampleTest : TestBase() {
class FlowSampleTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
expect(1)
Expand Down
14 changes: 7 additions & 7 deletions kotlinx-coroutines-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| ---------------- | --------------------------------------------- | ------------------------------------------------ | --------------------------
| [Job] | [join][Job.join] | [onJoin][Job.onJoin] | [isCompleted][Job.isCompleted]
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [trySend][kotlinx.coroutines.channels.SendChannel.trySend]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [tryReceive][kotlinx.coroutines.channels.ReceiveChannel.tryReceive]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

Expand Down Expand Up @@ -133,11 +133,11 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout
[kotlinx.coroutines.channels.ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html
[kotlinx.coroutines.channels.SendChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/index.html
[kotlinx.coroutines.channels.SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.SendChannel.trySend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/try-send.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
[kotlinx.coroutines.channels.ReceiveChannel.tryReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/try-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.receiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-catching.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html

<!--- INDEX kotlinx.coroutines.selects -->

Expand Down
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx

public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
public static fun getOnReceiveOrNull (Lkotlinx/coroutines/channels/ActorScope;)Lkotlinx/coroutines/selects/SelectClause1;
public static fun poll (Lkotlinx/coroutines/channels/ActorScope;)Ljava/lang/Object;
public static fun receiveOrNull (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
Expand Down Expand Up @@ -600,8 +602,10 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co

public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
public static fun getOnReceiveOrNull (Lkotlinx/coroutines/channels/Channel;)Lkotlinx/coroutines/selects/SelectClause1;
public static fun offer (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Object;)Z
public static fun poll (Lkotlinx/coroutines/channels/Channel;)Ljava/lang/Object;
public static fun receiveOrNull (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/Channel$Factory {
Expand All @@ -627,6 +631,9 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public static final fun Channel (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static final fun getOrElse-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onFailure-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public static final fun onSuccess-WpGqRn0 (Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ChannelResult {
Expand Down Expand Up @@ -840,7 +847,9 @@ public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static fun getOnReceiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/selects/SelectClause1;
public static fun poll (Lkotlinx/coroutines/channels/ReceiveChannel;)Ljava/lang/Object;
public static fun receiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/SendChannel {
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio
| [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted]
| [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveCatching][kotlinx.coroutines.channels.ReceiveChannel.receiveCatching] | [onReceiveCatching][kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll]
| [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock]
| none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none

Expand Down Expand Up @@ -149,8 +149,8 @@ Low-level primitives for finer-grained control of coroutines.
[kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html
[kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html
[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html
[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html
[kotlinx.coroutines.channels.ReceiveChannel.receiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-catching.html
[kotlinx.coroutines.channels.ReceiveChannel.onReceiveCatching]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-catching.html

<!--- INDEX kotlinx.coroutines.selects -->

Expand Down
Loading

0 comments on commit 98532c9

Please sign in to comment.