From 5f2bbc82f1af3bd69dbe0a069e122655f56614ab Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 29 Jan 2021 17:48:28 +0300 Subject: [PATCH] Properly cancel ChannelCoroutine when the channel was closed or cancelled Fixes #2506 --- .../common/src/channels/AbstractChannel.kt | 10 ++++++-- .../common/src/channels/ChannelCoroutine.kt | 4 ++-- .../channels/ChannelUndeliveredElementTest.kt | 6 ++++- .../common/test/channels/ProduceTest.kt | 23 ++++++++++++++++++- .../test/flow/channels/ChannelFlowTest.kt | 15 +++++++++++- 5 files changed, 51 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index bb7feef71f..9721583e83 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -642,7 +642,13 @@ internal abstract class AbstractChannel( cancelInternal(cause) final override fun cancel(cause: CancellationException?) { - if (isClosedForReceive) return // Do not create an exception if channel is already cancelled + /* + * Do not create an exception if channel is already cancelled. + * Channel is closed for receive when either it is cancelled (then we are free to bail out) + * or was closed and elements were received. + * Then `onCancelIdempotent` does nothing for all implementations. + */ + if (isClosedForReceive) return cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled")) } diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index 9ceb77ddc2..b2b257def2 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -26,7 +26,7 @@ internal open class ChannelCoroutine( } final override fun cancel(cause: CancellationException?) { - if (isClosedForReceive) return // Do not create an exception if channel is already cancelled + if (isCancelled) return // Do not create an exception if the coroutine (-> the channel) is already cancelled cancelInternal(cause ?: defaultCancellationException()) } diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt index 601c2381d7..5513dab782 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementTest.kt @@ -1,3 +1,7 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + package kotlinx.coroutines.channels import kotlinx.atomicfu.* @@ -115,4 +119,4 @@ class ChannelUndeliveredElementTest : TestBase() { check(!_cancelled.getAndSet(true)) { "Already cancelled" } } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt index 6ddde001e2..194504e713 100644 --- a/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ProduceTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -95,6 +95,27 @@ class ProduceTest : TestBase() { cancelOnCompletion(coroutineContext) } + @Test + fun testCancelWhenTheChannelIsClosed() = runTest { + val channel = produce { + send(1) + close() + expect(2) + launch { + expect(3) + hang { expect(5) } + } + } + + expect(1) + channel.receive() + yield() + expect(4) + channel.cancel() + (channel as Job).join() + finish(6) + } + @Test fun testAwaitConsumerCancellation() = runTest { val parent = Job() diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt index b115150a0b..31a929b2d8 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelFlowTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow @@ -194,4 +194,17 @@ class ChannelFlowTest : TestBase() { assertEquals(listOf(1), flow.toList()) finish(3) } + + @Test + fun testCancelledOnCompletion() = runTest { + val myFlow = callbackFlow { + expect(2) + close() + hang { expect(3) } + } + + expect(1) + myFlow.collect() + finish(4) + } }