Skip to content

Commit

Permalink
Fix exception types for channels to ensure transparency & reporting (#…
Browse files Browse the repository at this point in the history
…1158)

* Fix exception types for channels to ensure transparency & reporting

* ReceiveChannel.cancel always closes channel with CancellationException,
  so sending or receiving from a cancelled channel produces the
  corresponding CancellationException.
* Cancelling produce builder has similar effect, but an more specific
  instance of JobCancellationException is created.
* This ensure that produce/consumeEach pair is transparent with respect
  to cancellation and can be used to build "identity" transformation
  of the flow (the corresponding test is added).
* ClosedSendChannelException is now a subclass of IllegalStateException,
  so that trying to send to a channel that was closed normally is
  reported as program error and is not eaten (test is added).

Fixes #957
Fixes #1128

* Exceptions for channels cleanup

    * Documentation improved
    * Better exception message
    * Simplified flowOn implementation
    * Avoid exception instantiation on happy path in zip
  • Loading branch information
elizarov authored Apr 30, 2019
1 parent f8eac76 commit e569bd3
Show file tree
Hide file tree
Showing 20 changed files with 166 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ public final class kotlinx/coroutines/channels/ClosedReceiveChannelException : j
public fun <init> (Ljava/lang/String;)V
}

public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/util/concurrent/CancellationException {
public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/lang/IllegalStateException {
public fun <init> (Ljava/lang/String;)V
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
cancelInternal(cause)

final override fun cancel(cause: CancellationException?) {
cancelInternal(cause)
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}

// It needs to be internal to support deprecated cancel(Throwable?) API
Expand Down
18 changes: 12 additions & 6 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,16 @@ public interface ReceiveChannel<out E> {
/**
* Cancels reception of remaining elements from this channel with an optional [cause].
* This function closes the channel and removes all buffered sent elements from it.
*
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* If the cause is not specified, then an instance of [CancellationException] with a
* default message is created to [close][SendChannel.close] the channel.
*
* Immediately after invocation of this function [isClosedForReceive] and
* [isClosedForSend][SendChannel.isClosedForSend]
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
* [ClosedReceiveChannelException].
* on the side of [SendChannel] start returning `true`. All attempts to send to this channel
* or receive from this channel will throw [CancellationException].
*/
public fun cancel(cause: CancellationException? = null)

Expand Down Expand Up @@ -382,14 +384,18 @@ public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
* that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on send attempts.
*
* This exception is a subclass of [IllegalStateException] because, conceptually, sender is responsible
* for closing the channel and not be trying to send anything after the channel was close. Attempts to
* send into the closed channel indicate logical error in the sender's code.
*/
public class ClosedSendChannelException(message: String?) : CancellationException(message)
public class ClosedSendChannelException(message: String?) : IllegalStateException(message)

/**
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on receive attempts.
*
* This exception is subclass of [NoSuchElementException] to be consistent with plain collections.
* This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
*/
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
11 changes: 9 additions & 2 deletions kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ internal open class ChannelCoroutine<E>(
}

override fun cancelInternal(cause: Throwable?): Boolean {
_channel.cancel(cause?.toCancellationException()) // cancel the channel
cancelCoroutine(cause) // cancel the job
val exception = cause?.toCancellationException()
?: JobCancellationException("$classSimpleName was cancelled", null, this)
_channel.cancel(exception) // cancel the channel
cancelCoroutine(exception) // cancel the job
return true // does not matter - result is used in DEPRECATED functions only
}

@Suppress("UNCHECKED_CAST")
suspend fun sendFair(element: E) {
(_channel as AbstractSendChannel<E>).sendFair(element)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*

/**
* This exception is thrown when operator need no more elements from the flow.
* This exception should never escape outside of operator's implementation.
*/
internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
// TODO expect/actual
// override fun fillInStackTrace(): Throwable = this
}
13 changes: 2 additions & 11 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,8 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
send(value)
}
}

// TODO semantics doesn't play well here and we pay for that with additional object
(channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
for (element in channel) {
emit(element)
}

val producer = channel as Job
if (producer.isCancelled) {
producer.join()
throw producer.getCancellationException()
channel.consumeEach { value ->
emit(value)
}
}
}
Expand Down
16 changes: 6 additions & 10 deletions kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow

/**
* Returns a flow that ignores first [count] elements.
Expand Down Expand Up @@ -57,10 +58,10 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
collect { value ->
emit(value)
if (++consumed == count) {
throw TakeLimitException()
throw AbortFlowException()
}
}
} catch (e: TakeLimitException) {
} catch (e: AbortFlowException) {
// Nothing, bail out
}
}
Expand All @@ -74,14 +75,9 @@ public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
try {
collect { value ->
if (predicate(value)) emit(value)
else throw TakeLimitException()
else throw AbortFlowException()
}
} catch (e: TakeLimitException) {
} catch (e: AbortFlowException) {
// Nothing, bail out
}
}

private class TakeLimitException : CancellationException("Flow limit is reached, cancelling") {
// TODO expect/actual
// override fun fillInStackTrace(): Throwable = this
}
25 changes: 11 additions & 14 deletions kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen

private inline fun SelectBuilder<Unit>.onReceive(
isClosed: Boolean,
channel: Channel<Any>,
channel: ReceiveChannel<Any>,
crossinline onClosed: () -> Unit,
noinline onReceive: suspend (value: Any) -> Unit
) {
Expand All @@ -90,18 +90,11 @@ private inline fun SelectBuilder<Unit>.onReceive(
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel<Any> {
val channel = RendezvousChannel<Any>() // Explicit type
launch {
try {
flow.collect { value ->
channel.sendFair(value ?: NullSurrogate)
}
} finally {
channel.close()
}
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
channel.sendFair(value ?: NullSurrogate)
}
return channel
}


Expand Down Expand Up @@ -133,7 +126,9 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
*
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
*/
(second as SendChannel<*>).invokeOnClose { first.cancel() }
(second as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(AbortFlowException())
}

val otherIterator = second.iterator()
try {
Expand All @@ -144,8 +139,10 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
val secondValue = NullSurrogate.unbox<T2>(otherIterator.next())
emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
}
} catch (e: AbortFlowException) {
// complete
} finally {
second.cancel()
if (!second.isClosedForReceive) second.cancel(AbortFlowException())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,25 @@ class ArrayBroadcastChannelTest : TestBase() {
// start consuming
val sub = channel.openSubscription()
var expected = 0
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
sub.cancel()
assertFailsWith<CancellationException> {
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
sub.cancel()
}
}
}
check(expected == 2)
}

@Test
fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
fun testReceiveFromCancelledSub() = runTest {
val channel = BroadcastChannel<Int>(1)
val sub = channel.openSubscription()
assertFalse(sub.isClosedForReceive)
sub.cancel()
assertTrue(sub.isClosedForReceive)
sub.receive()
assertFailsWith<CancellationException> { sub.receive() }
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
finish(12)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import kotlinx.coroutines.*
import kotlin.test.*

class BasicOperationsTest : TestBase() {

@Test
fun testSimpleSendReceive() = runTest {
// Parametrized common test :(
Expand All @@ -20,6 +19,11 @@ class BasicOperationsTest : TestBase() {
TestChannelKind.values().forEach { kind -> testOffer(kind) }
}

@Test
fun testSendAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
}

@Test
fun testReceiveOrNullAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
Expand Down Expand Up @@ -128,6 +132,23 @@ class BasicOperationsTest : TestBase() {
d.await()
}

/**
* [ClosedSendChannelException] should not be eaten.
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
*/
private suspend fun testSendAfterClose(kind: TestChannelKind) {
assertFailsWith<ClosedSendChannelException> {
coroutineScope {
val channel = kind.create()
channel.close()

launch {
channel.send(1)
}
}
}
}

private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) = coroutineScope {
val channel = kind.create()
launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ConflatedChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
finish(2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class LinkedListChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
}

@Test
Expand Down
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ProduceTest : TestBase() {
expectUnreached()
} catch (e: Throwable) {
expect(7)
check(e is ClosedSendChannelException)
check(e is CancellationException)
throw e
}
expectUnreached()
Expand All @@ -48,7 +48,7 @@ class ProduceTest : TestBase() {
expect(4)
c.cancel()
expect(5)
assertNull(c.receiveOrNull())
assertFailsWith<CancellationException> { c.receiveOrNull() }
expect(6)
yield() // to produce
finish(8)
Expand Down Expand Up @@ -107,7 +107,6 @@ class ProduceTest : TestBase() {
produced.cancel()
try {
source.receive()
// TODO shouldn't it be ClosedReceiveChannelException ?
} catch (e: CancellationException) {
finish(4)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class RendezvousChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
finish(12)
}

Expand Down
Loading

0 comments on commit e569bd3

Please sign in to comment.