diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index b1375b257e..366906acd6 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -692,7 +692,7 @@ public final class kotlinx/coroutines/channels/ClosedReceiveChannelException : j public fun (Ljava/lang/String;)V } -public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/util/concurrent/CancellationException { +public final class kotlinx/coroutines/channels/ClosedSendChannelException : java/lang/IllegalStateException { public fun (Ljava/lang/String;)V } diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 05bfbca98d..f3fb7cd288 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -669,7 +669,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel { /** * Cancels reception of remaining elements from this channel with an optional [cause]. * This function closes the channel and removes all buffered sent elements from it. + * * A cause can be used to specify an error message or to provide other details on * a cancellation reason for debugging purposes. + * If the cause is not specified, then an instance of [CancellationException] with a + * default message is created to [close][SendChannel.close] the channel. * * Immediately after invocation of this function [isClosedForReceive] and * [isClosedForSend][SendChannel.isClosedForSend] - * on the side of [SendChannel] start returning `true`, so all attempts to send to this channel - * afterwards will throw [ClosedSendChannelException], while attempts to receive will throw - * [ClosedReceiveChannelException]. + * on the side of [SendChannel] start returning `true`. All attempts to send to this channel + * or receive from this channel will throw [CancellationException]. */ public fun cancel(cause: CancellationException? = null) @@ -382,14 +384,18 @@ public fun Channel(capacity: Int = RENDEZVOUS): Channel = * Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel * that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause * exception on send attempts. + * + * This exception is a subclass of [IllegalStateException] because, conceptually, sender is responsible + * for closing the channel and not be trying to send anything after the channel was close. Attempts to + * send into the closed channel indicate logical error in the sender's code. */ -public class ClosedSendChannelException(message: String?) : CancellationException(message) +public class ClosedSendChannelException(message: String?) : IllegalStateException(message) /** * Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive] * channel that was closed without a cause. A _failed_ channel rethrows the original [close][SendChannel.close] cause * exception on receive attempts. * - * This exception is subclass of [NoSuchElementException] to be consistent with plain collections. + * This exception is a subclass of [NoSuchElementException] to be consistent with plain collections. */ -public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message) +public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message) \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index 9382600af8..dc2cc5b69f 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -28,8 +28,15 @@ internal open class ChannelCoroutine( } override fun cancelInternal(cause: Throwable?): Boolean { - _channel.cancel(cause?.toCancellationException()) // cancel the channel - cancelCoroutine(cause) // cancel the job + val exception = cause?.toCancellationException() + ?: JobCancellationException("$classSimpleName was cancelled", null, this) + _channel.cancel(exception) // cancel the channel + cancelCoroutine(exception) // cancel the job return true // does not matter - result is used in DEPRECATED functions only } + + @Suppress("UNCHECKED_CAST") + suspend fun sendFair(element: E) { + (_channel as AbstractSendChannel).sendFair(element) + } } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt b/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt new file mode 100644 index 0000000000..e432c18f8c --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/internal/AbortFlowException.kt @@ -0,0 +1,16 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* + +/** + * This exception is thrown when operator need no more elements from the flow. + * This exception should never escape outside of operator's implementation. + */ +internal class AbortFlowException : CancellationException("Flow was aborted, no more elements needed") { + // TODO expect/actual + // override fun fillInStackTrace(): Throwable = this +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 6f676d27a9..3dc021b635 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -53,17 +53,8 @@ public fun Flow.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1 send(value) } } - - // TODO semantics doesn't play well here and we pay for that with additional object - (channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() } - for (element in channel) { - emit(element) - } - - val producer = channel as Job - if (producer.isCancelled) { - producer.join() - throw producer.getCancellationException() + channel.consumeEach { value -> + emit(value) } } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index 80ef4b66e6..c60d105b02 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -8,9 +8,10 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.flow.unsafeFlow as flow import kotlinx.coroutines.* +import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* +import kotlinx.coroutines.flow.unsafeFlow as flow /** * Returns a flow that ignores first [count] elements. @@ -57,10 +58,10 @@ public fun Flow.take(count: Int): Flow { collect { value -> emit(value) if (++consumed == count) { - throw TakeLimitException() + throw AbortFlowException() } } - } catch (e: TakeLimitException) { + } catch (e: AbortFlowException) { // Nothing, bail out } } @@ -74,14 +75,9 @@ public fun Flow.takeWhile(predicate: suspend (T) -> Boolean): Flow = f try { collect { value -> if (predicate(value)) emit(value) - else throw TakeLimitException() + else throw AbortFlowException() } - } catch (e: TakeLimitException) { + } catch (e: AbortFlowException) { // Nothing, bail out } } - -private class TakeLimitException : CancellationException("Flow limit is reached, cancelling") { - // TODO expect/actual - // override fun fillInStackTrace(): Throwable = this -} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index ab0bee3f96..ea46a09833 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -78,7 +78,7 @@ public fun Flow.combineLatest(other: Flow, transform: suspen private inline fun SelectBuilder.onReceive( isClosed: Boolean, - channel: Channel, + channel: ReceiveChannel, crossinline onClosed: () -> Unit, noinline onReceive: suspend (value: Any) -> Unit ) { @@ -90,18 +90,11 @@ private inline fun SelectBuilder.onReceive( } // Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed -private fun CoroutineScope.asFairChannel(flow: Flow<*>): Channel { - val channel = RendezvousChannel() // Explicit type - launch { - try { - flow.collect { value -> - channel.sendFair(value ?: NullSurrogate) - } - } finally { - channel.close() - } +private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel = produce { + val channel = channel as ChannelCoroutine + flow.collect { value -> + channel.sendFair(value ?: NullSurrogate) } - return channel } @@ -133,7 +126,9 @@ public fun Flow.zip(other: Flow, transform: suspend (T1, T2) * * Invariant: this clause is invoked only when all elements from the channel were processed (=> rendezvous restriction). */ - (second as SendChannel<*>).invokeOnClose { first.cancel() } + (second as SendChannel<*>).invokeOnClose { + if (!first.isClosedForReceive) first.cancel(AbortFlowException()) + } val otherIterator = second.iterator() try { @@ -144,8 +139,10 @@ public fun Flow.zip(other: Flow, transform: suspend (T1, T2) val secondValue = NullSurrogate.unbox(otherIterator.next()) emit(transform(NullSurrogate.unbox(value), NullSurrogate.unbox(secondValue))) } + } catch (e: AbortFlowException) { + // complete } finally { - second.cancel() + if (!second.isClosedForReceive) second.cancel(AbortFlowException()) } } } diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt index 9867ead560..a7084296bb 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayBroadcastChannelTest.kt @@ -170,23 +170,25 @@ class ArrayBroadcastChannelTest : TestBase() { // start consuming val sub = channel.openSubscription() var expected = 0 - sub.consumeEach { - check(it == ++expected) - if (it == 2) { - sub.cancel() + assertFailsWith { + sub.consumeEach { + check(it == ++expected) + if (it == 2) { + sub.cancel() + } } } check(expected == 2) } @Test - fun testReceiveFromClosedSub() = runTest({ it is ClosedReceiveChannelException }) { + fun testReceiveFromCancelledSub() = runTest { val channel = BroadcastChannel(1) val sub = channel.openSubscription() assertFalse(sub.isClosedForReceive) sub.cancel() assertTrue(sub.isClosedForReceive) - sub.receive() + assertFailsWith { sub.receive() } } @Test diff --git a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt index bcff1edfa0..2b948dfa25 100644 --- a/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ArrayChannelTest.kt @@ -134,7 +134,7 @@ class ArrayChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - check(q.receiveOrNull() == null) + assertFailsWith { q.receiveOrNull() } finish(12) } diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index a5e180aeee..820fad67f6 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -8,7 +8,6 @@ import kotlinx.coroutines.* import kotlin.test.* class BasicOperationsTest : TestBase() { - @Test fun testSimpleSendReceive() = runTest { // Parametrized common test :( @@ -20,6 +19,11 @@ class BasicOperationsTest : TestBase() { TestChannelKind.values().forEach { kind -> testOffer(kind) } } + @Test + fun testSendAfterClose() = runTest { + TestChannelKind.values().forEach { kind -> testSendAfterClose(kind) } + } + @Test fun testReceiveOrNullAfterClose() = runTest { TestChannelKind.values().forEach { kind -> testReceiveOrNull(kind) } @@ -128,6 +132,23 @@ class BasicOperationsTest : TestBase() { d.await() } + /** + * [ClosedSendChannelException] should not be eaten. + * See [https://github.com/Kotlin/kotlinx.coroutines/issues/957] + */ + private suspend fun testSendAfterClose(kind: TestChannelKind) { + assertFailsWith { + coroutineScope { + val channel = kind.create() + channel.close() + + launch { + channel.send(1) + } + } + } + } + private suspend fun testSendReceive(kind: TestChannelKind, iterations: Int) = coroutineScope { val channel = kind.create() launch { diff --git a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt index 666b706499..6b5e020d27 100644 --- a/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ConflatedChannelTest.kt @@ -73,7 +73,7 @@ class ConflatedChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - check(q.receiveOrNull() == null) + assertFailsWith { q.receiveOrNull() } finish(2) } diff --git a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt index 700ea96c46..4233a35084 100644 --- a/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/LinkedListChannelTest.kt @@ -31,7 +31,7 @@ class LinkedListChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - check(q.receiveOrNull() == null) + assertFailsWith { q.receiveOrNull() } } @Test diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index f286ba5d24..5137dd740d 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -38,7 +38,7 @@ class ProduceTest : TestBase() { expectUnreached() } catch (e: Throwable) { expect(7) - check(e is ClosedSendChannelException) + check(e is CancellationException) throw e } expectUnreached() @@ -48,7 +48,7 @@ class ProduceTest : TestBase() { expect(4) c.cancel() expect(5) - assertNull(c.receiveOrNull()) + assertFailsWith { c.receiveOrNull() } expect(6) yield() // to produce finish(8) @@ -107,7 +107,6 @@ class ProduceTest : TestBase() { produced.cancel() try { source.receive() - // TODO shouldn't it be ClosedReceiveChannelException ? } catch (e: CancellationException) { finish(4) } diff --git a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt index d7ca753e0b..54d6938481 100644 --- a/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/RendezvousChannelTest.kt @@ -272,7 +272,7 @@ class RendezvousChannelTest : TestBase() { q.cancel() check(q.isClosedForSend) check(q.isClosedForReceive) - check(q.receiveOrNull() == null) + assertFailsWith { q.receiveOrNull() } finish(12) } diff --git a/kotlinx-coroutines-core/common/test/flow/IdFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/IdFlowTest.kt new file mode 100644 index 0000000000..a7299cc8d9 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/IdFlowTest.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.test.* + +// See https://github.com/Kotlin/kotlinx.coroutines/issues/1128 +class IdFlowTest : TestBase() { + @Test + fun testCancelInCollect() = runTest( + expected = { it is CancellationException } + ) { + expect(1) + flow { + expect(2) + emit(1) + expect(3) + hang { finish(6) } + }.idScoped().collect { value -> + expect(4) + assertEquals(1, value) + kotlin.coroutines.coroutineContext.cancel() + expect(5) + } + expectUnreached() + } + + @Test + fun testCancelInFlow() = runTest( + expected = { it is CancellationException } + ) { + expect(1) + flow { + expect(2) + emit(1) + kotlin.coroutines.coroutineContext.cancel() + expect(3) + }.idScoped().collect { value -> + finish(4) + assertEquals(1, value) + } + expectUnreached() + } +} + +/** + * This flow should be "identity" function with respect to cancellation. + */ +private fun Flow.idScoped(): Flow = flow { + coroutineScope { + val channel = produce { + collect { send(it) } + } + channel.consumeEach { + emit(it) + } + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt index 7024961128..e02da811e4 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt @@ -227,6 +227,4 @@ class ZipTest : TestBase() { assertFailsWith(flow) finish(2) } - - private suspend fun sum(s: String?, i: Int?): String = s + i } diff --git a/kotlinx-coroutines-core/jvm/test/channels/DoubleChannelCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/DoubleChannelCloseStressTest.kt index 3c9af50b14..01cace720c 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/DoubleChannelCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/DoubleChannelCloseStressTest.kt @@ -17,7 +17,11 @@ class DoubleChannelCloseStressTest : TestBase() { // empty -- just closes channel } GlobalScope.launch(CoroutineName("sender")) { - actor.send(1) + try { + actor.send(1) + } catch (e: ClosedSendChannelException) { + // ok -- closed before send + } } Thread.sleep(1) actor.close() diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 392d982700..cffe6c0e20 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -48,8 +48,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delayChannel.cancel() delay(5100) - delayChannel.checkEmpty() - delayChannel.cancel() + assertFailsWith { delayChannel.poll() } } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/ProduceExceptionsTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/ProduceExceptionsTest.kt index b4df58ac34..83bd72355f 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/ProduceExceptionsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/ProduceExceptionsTest.kt @@ -77,7 +77,7 @@ class ProduceExceptionsTest : TestBase() { channel!!.cancel() try { send(1) - } catch (e: ClosedSendChannelException) { + } catch (e: CancellationException) { expect(3) throw e } @@ -87,7 +87,7 @@ class ProduceExceptionsTest : TestBase() { yield() try { channel.receive() - } catch (e: ClosedReceiveChannelException) { + } catch (e: CancellationException) { assertTrue(e.suppressed.isEmpty()) finish(4) }