Skip to content

Commit

Permalink
Extend the KDoc for some channel APIs (#4148)
Browse files Browse the repository at this point in the history
* Reword the BufferOverflow KDoc for consistency in the entry list

Before, the description of `SUSPEND` was phrased in terms of what
will happen, while the rest of the entries were described in an
imperative form, that is, as commands as to what should happen.
Now, all entries are clarified using a descriptive form.

* Describe the situations in which BufferOverflow options are useful

* Expand the documentation for channel consumption functions

Added explanations of what exactly happens on each code path,
how these operators ensure that all elements get processed
eventually, and provided some usage examples.

* Specify the behavior of Channel.consumeEach on scope cancellation

* Extend the documentation for `ProducerScope.awaitClose`

Filed #4149

* Reword a misleading statement in the `produce` documentation

Currently, the documentation states that uncaught exceptions will
lead to the channel being closed.
"Uncaught exceptions" is a special thing in kotlinx.coroutines:
<https://kotlinlang.org/docs/exception-handling.html#coroutineexceptionhandler>
These are not just exceptions that are not wrapped in a try-catch,
these are exceptions that can not be propagated to a root coroutine
via structured concurrency.

Fixed the wording and added a test that shows that uncaught
coroutine exceptions are not handled in any special manner.

* Document `awaitClose` and `invokeOnClose` interactions

Turns out, only a single invocation of either `awaitClose` or
`invokeOnClose` is allowed in the lifetime of a channel.
Document that.

* Document how consuming operators handle failed channels

* Document cancelling the coroutine but not the channel of `produce`

* Don't use the magic constant 0 in default parameters of `produce`

Instead, use `Channel.RENDEZVOUS` so that a meaningful constant
is shown in Dokka's output.

* Fix an incorrect statement in `produce` docs

Currently, the docs claim that attempting to receive from a failed
channel fails. However, the documentation for `Channel` itself
correctly states that before `receive` fails, the elements that
were already sent will be processed first. Corrected this and
added a test demonstrating the behavior.

* Add samples to the `produce` documentation and restructure it
  • Loading branch information
globsterg authored Jul 30, 2024
1 parent ab279a7 commit bc8160b
Show file tree
Hide file tree
Showing 6 changed files with 433 additions and 35 deletions.
20 changes: 15 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/BufferOverflow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@ package kotlinx.coroutines.channels
*
* - [SUSPEND] &mdash; the upstream that is [sending][SendChannel.send] or
* is [emitting][kotlinx.coroutines.flow.FlowCollector.emit] a value is **suspended** while the buffer is full.
* - [DROP_OLDEST] &mdash; drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
* - [DROP_LATEST] &mdash; drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
* - [DROP_OLDEST] &mdash; **the oldest** value in the buffer is dropped on overflow, and the new value is added,
* all without suspending.
* - [DROP_LATEST] &mdash; the buffer remains unchanged on overflow, and the value that we were going to add
* gets discarded, all without suspending.
*/
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*
* Use this to create backpressure, forcing the producers to slow down creation of new values in response to
* consumers not being able to process the incoming values in time.
* [SUSPEND] is a good choice when all elements must eventually be processed.
*/
SUSPEND,

/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*
* Use this in scenarios when only the last few values are important and skipping the processing of severely
* outdated ones is desirable.
*/
DROP_OLDEST,

/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
* Leave the buffer unchanged on overflow, dropping the value that we were going to add, do not suspend.
*
* This option can be used in rare advanced scenarios where all elements that are expected to enter the buffer are
* equal, so it is not important which of them get thrown away.
*/
DROP_LATEST
}
116 changes: 108 additions & 8 deletions kotlinx-coroutines-core/common/src/channels/Channels.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,43 @@ public fun <E : Any> ReceiveChannel<E>.onReceiveOrNull(): SelectClause1<E?> {
}

/**
* Makes sure that the given [block] consumes all elements from the given channel
* by always invoking [cancel][ReceiveChannel.cancel] after the execution of the block.
* Executes the [block] and then [cancels][ReceiveChannel.cancel] the channel.
*
* The operation is _terminal_.
* It is guaranteed that, after invoking this operation, the channel will be [cancelled][ReceiveChannel.cancel], so
* the operation is _terminal_.
* If the [block] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
*
* This function is useful for building more complex terminal operators while ensuring that the producers stop sending
* new elements to the channel.
*
* Example:
* ```
* suspend fun <E> ReceiveChannel<E>.consumeFirst(): E =
* consume { return receive() }
* // Launch a coroutine that constantly sends new values
* val channel = produce(Dispatchers.Default) {
* var i = 0
* while (true) {
* // Will fail with a `CancellationException`
* // after `consumeFirst` finishes.
* send(i++)
* }
* }
* // Grab the first value and discard everything else
* val firstElement = channel.consumeFirst()
* check(firstElement == 0)
* // *Note*: some elements could be lost in the channel!
* ```
*
* In this example, the channel will get closed, and the producer coroutine will finish its work after the first
* element is obtained.
* If `consumeFirst` was implemented as `for (e in this) { return e }` instead, the producer coroutine would be active
* until it was cancelled some other way.
*
* [consume] does not guarantee that new elements will not enter the channel after [block] finishes executing, so
* some channel elements may be lost.
* Use the `onUndeliveredElement` parameter of a manually created [Channel] to define what should happen with these
* elements during [ReceiveChannel.cancel].
*/
public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
contract {
Expand All @@ -70,23 +103,90 @@ public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -
}

/**
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel]
* the channel after the execution of the block.
* If you need to iterate over the channel without consuming it, a regular `for` loop should be used instead.
* Performs the given [action] for each received element and [cancels][ReceiveChannel.cancel] the channel afterward.
*
* This function stops processing elements when either the channel is [closed][SendChannel.close],
* the coroutine in which the collection is performed gets cancelled and there are no readily available elements in the
* channel's buffer,
* [action] fails with an exception,
* or an early return from [action] happens.
* If the [action] finishes with an exception, that exception will be used for cancelling the channel and rethrown.
* If the channel is [closed][SendChannel.close] with a cause, this cause will be rethrown from [consumeEach].
*
* When the channel does not need to be closed after iterating over its elements,
* a regular `for` loop (`for (element in channel)`) should be used instead.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
* This function [consumes][ReceiveChannel.consume] the elements of the original [ReceiveChannel].
*
* This function is useful in cases when this channel is only expected to have a single consumer that decides when
* the producer may stop.
* Example:
*
* ```
* val channel = Channel<Int>(1)
* // Launch several procedures that create values
* repeat(5) {
* launch(Dispatchers.Default) {
* while (true) {
* channel.send(Random.nextInt(40, 50))
* }
* }
* }
* // Launch the exclusive consumer
* val result = run {
* channel.consumeEach {
* if (it == 42) {
* println("Found the answer")
* return@run it // forcibly stop collection
* }
* }
* // *Note*: some elements could be lost in the channel!
* }
* check(result == 42)
* ```
*
* In this example, several coroutines put elements into a single channel, and a single consumer processes the elements.
* Once it finds the elements it's looking for, it stops [consumeEach] by making an early return.
*
* **Pitfall**: even though the name says "each", some elements could be left unprocessed if they are added after
* this function decided to close the channel.
* In this case, the elements will simply be lost.
* If the elements of the channel are resources that must be closed (like file handles, sockets, etc.),
* an `onUndeliveredElement` must be passed to the [Channel] on construction.
* It will be called for each element left in the channel at the point of cancellation.
*/
public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit): Unit =
consume {
for (e in this) action(e)
}

/**
* Returns a [List] containing all elements.
* Returns a [List] containing all the elements sent to this channel, preserving their order.
*
* This function will attempt to receive elements and put them into the list until the channel is
* [closed][SendChannel.close].
* Calling [toList] on channels that are not eventually closed is always incorrect:
* - It will suspend indefinitely if the channel is not closed, but no new elements arrive.
* - If new elements do arrive and the channel is not eventually closed, [toList] will use more and more memory
* until exhausting it.
*
* If the channel is [closed][SendChannel.close] with a cause, [toList] will rethrow that cause.
*
* The operation is _terminal_.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* Example:
* ```
* val values = listOf(1, 5, 2, 9, 3, 3, 1)
* // start a new coroutine that creates a channel,
* // sends elements to it, and closes it
* // once the coroutine's body finishes
* val channel = produce {
* values.forEach { send(it) }
* }
* check(channel.toList() == values)
* ```
*/
public suspend fun <E> ReceiveChannel<E>.toList(): List<E> = buildList {
consumeEach {
Expand Down
Loading

0 comments on commit bc8160b

Please sign in to comment.