Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exception types for channels to ensure transparency & reporting #1158

Merged
merged 2 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ public final class kotlinx/coroutines/channels/ClosedReceiveChannelException : j
public fun <init> (Ljava/lang/String;)V
}

public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/util/concurrent/CancellationException {
public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/lang/IllegalStateException {
public fun <init> (Ljava/lang/String;)V
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
cancelInternal(cause)

final override fun cancel(cause: CancellationException?) {
cancelInternal(cause)
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}

// It needs to be internal to support deprecated cancel(Throwable?) API
Expand Down
18 changes: 12 additions & 6 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,16 @@ public interface ReceiveChannel<out E> {
/**
* Cancels reception of remaining elements from this channel with an optional [cause].
* This function closes the channel and removes all buffered sent elements from it.
*
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* If the cause is not specified, then an instance of [CancellationException] with a
* default message is created to [close][SendChannel.close] the channel.
*
* Immediately after invocation of this function [isClosedForReceive] and
* [isClosedForSend][SendChannel.isClosedForSend]
* on the side of [SendChannel] start returning `true`, so all attempts to send to this channel
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
* [ClosedReceiveChannelException].
* on the side of [SendChannel] start returning `true`. All attempts to send to this channel
* or receive from this channel will throw [CancellationException].
*/
public fun cancel(cause: CancellationException? = null)

Expand Down Expand Up @@ -382,14 +384,18 @@ public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
* that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on send attempts.
*
* This exception is a subclass of [IllegalStateException] because, conceptually, sender is responsible
* for closing the channel and not be trying to send anything after the channel was close. Attempts to
* send into the closed channel indicate logical error in the sender's code.
*/
public class ClosedSendChannelException(message: String?) : CancellationException(message)
public class ClosedSendChannelException(message: String?) : IllegalStateException(message)

/**
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on receive attempts.
*
* This exception is subclass of [NoSuchElementException] to be consistent with plain collections.
* This exception is a subclass of [NoSuchElementException] to be consistent with plain collections.
*/
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)
11 changes: 9 additions & 2 deletions kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,15 @@ internal open class ChannelCoroutine<E>(
}

override fun cancelInternal(cause: Throwable?): Boolean {
_channel.cancel(cause?.toCancellationException()) // cancel the channel
cancelCoroutine(cause) // cancel the job
val exception = cause?.toCancellationException()
?: JobCancellationException("$classSimpleName was cancelled", null, this)
_channel.cancel(exception) // cancel the channel
cancelCoroutine(exception) // cancel the job
return true // does not matter - result is used in DEPRECATED functions only
}

@Suppress("UNCHECKED_CAST")
suspend fun sendFair(element: E) {
(_channel as AbstractSendChannel<E>).sendFair(element)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*

/**
* This exception is thrown when operator need no more elements from the flow.
* This exception should never escape outside of operator's implementation.
*/
internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") {
// TODO expect/actual
// override fun fillInStackTrace(): Throwable = this
}
13 changes: 2 additions & 11 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,8 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
send(value)
}
}

// TODO semantics doesn't play well here and we pay for that with additional object
(channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
for (element in channel) {
emit(element)
}

val producer = channel as Job
if (producer.isCancelled) {
producer.join()
throw producer.getCancellationException()
channel.consumeEach { value ->
emit(value)
}
}
}
Expand Down
16 changes: 6 additions & 10 deletions kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow

/**
* Returns a flow that ignores first [count] elements.
Expand Down Expand Up @@ -57,10 +58,10 @@ public fun <T> Flow<T>.take(count: Int): Flow<T> {
collect { value ->
emit(value)
if (++consumed == count) {
throw TakeLimitException()
throw AbortFlowException()
}
}
} catch (e: TakeLimitException) {
} catch (e: AbortFlowException) {
// Nothing, bail out
}
}
Expand All @@ -74,14 +75,9 @@ public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = f
try {
collect { value ->
if (predicate(value)) emit(value)
else throw TakeLimitException()
else throw AbortFlowException()
}
} catch (e: TakeLimitException) {
} catch (e: AbortFlowException) {
// Nothing, bail out
}
}

private class TakeLimitException : CancellationException("Flow limit is reached, cancelling") {
// TODO expect/actual
// override fun fillInStackTrace(): Throwable = this
}
25 changes: 11 additions & 14 deletions kotlinx-coroutines-core/common/src/flow/operators/Zip.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen

private inline fun SelectBuilder<Unit>.onReceive(
isClosed: Boolean,
channel: Channel<Any>,
channel: ReceiveChannel<Any>,
crossinline onClosed: () -> Unit,
noinline onReceive: suspend (value: Any) -> Unit
) {
Expand All @@ -90,18 +90,11 @@ private inline fun SelectBuilder<Unit>.onReceive(
}

// Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed
private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel<Any> {
val channel = RendezvousChannel<Any>() // Explicit type
launch {
try {
flow.collect { value ->
channel.sendFair(value ?: NullSurrogate)
}
} finally {
channel.close()
}
private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel<Any> = produce {
val channel = channel as ChannelCoroutine<Any>
flow.collect { value ->
channel.sendFair(value ?: NullSurrogate)
}
return channel
}


Expand Down Expand Up @@ -133,7 +126,9 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
*
* Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction).
*/
(second as SendChannel<*>).invokeOnClose { first.cancel() }
(second as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(AbortFlowException())
}

val otherIterator = second.iterator()
try {
Expand All @@ -144,8 +139,10 @@ public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2)
val secondValue = NullSurrogate.unbox<T2>(otherIterator.next())
emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue)))
}
} catch (e: AbortFlowException) {
// complete
} finally {
second.cancel()
if (!second.isClosedForReceive) second.cancel(AbortFlowException())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,25 @@ class ArrayBroadcastChannelTest : TestBase() {
// start consuming
val sub = channel.openSubscription()
var expected = 0
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
sub.cancel()
assertFailsWith<CancellationException> {
sub.consumeEach {
check(it == ++expected)
if (it == 2) {
sub.cancel()
}
}
}
check(expected == 2)
}

@Test
fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) {
fun testReceiveFromCancelledSub() = runTest {
val channel = BroadcastChannel<Int>(1)
val sub = channel.openSubscription()
assertFalse(sub.isClosedForReceive)
sub.cancel()
assertTrue(sub.isClosedForReceive)
sub.receive()
assertFailsWith<CancellationException> { sub.receive() }
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
finish(12)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import kotlinx.coroutines.*
import kotlin.test.*

class BasicOperationsTest : TestBase() {

@Test
fun testSimpleSendReceive() = runTest {
// Parametrized common test :(
Expand All @@ -20,6 +19,11 @@ class BasicOperationsTest : TestBase() {
TestChannelKind.values().forEach { kind -> testOffer(kind) }
}

@Test
fun testSendAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) }
}

@Test
fun testReceiveOrNullAfterClose() = runTest {
TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) }
Expand Down Expand Up @@ -128,6 +132,23 @@ class BasicOperationsTest : TestBase() {
d.await()
}

/**
* [ClosedSendChannelException] should not be eaten.
* See [https://github.com/Kotlin/kotlinx.coroutines/issues/957]
*/
private suspend fun testSendAfterClose(kind: TestChannelKind) {
assertFailsWith<ClosedSendChannelException> {
coroutineScope {
val channel = kind.create()
channel.close()

launch {
channel.send(1)
}
}
}
}

private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) = coroutineScope {
val channel = kind.create()
launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ConflatedChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
finish(2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class LinkedListChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
}

@Test
Expand Down
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ProduceTest : TestBase() {
expectUnreached()
} catch (e: Throwable) {
expect(7)
check(e is ClosedSendChannelException)
check(e is CancellationException)
throw e
}
expectUnreached()
Expand All @@ -48,7 +48,7 @@ class ProduceTest : TestBase() {
expect(4)
c.cancel()
expect(5)
assertNull(c.receiveOrNull())
assertFailsWith<CancellationException> { c.receiveOrNull() }
expect(6)
yield() // to produce
finish(8)
Expand Down Expand Up @@ -107,7 +107,6 @@ class ProduceTest : TestBase() {
produced.cancel()
try {
source.receive()
// TODO shouldn't it be ClosedReceiveChannelException ?
} catch (e: CancellationException) {
finish(4)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ class RendezvousChannelTest : TestBase() {
q.cancel()
check(q.isClosedForSend)
check(q.isClosedForReceive)
check(q.receiveOrNull() == null)
assertFailsWith<CancellationException> { q.receiveOrNull() }
finish(12)
}

Expand Down
Loading