Skip to content

Commit

Permalink
Properly cancel ChannelCoroutine when the channel was closed or cance…
Browse files Browse the repository at this point in the history
…lled (#2507)

Fixes #2506
  • Loading branch information
qwwdfsad authored Jan 29, 2021
1 parent 5531a6e commit 7061cc2
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 7 deletions.
10 changes: 8 additions & 2 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels
Expand Down Expand Up @@ -642,7 +642,13 @@ internal abstract class AbstractChannel<E>(
cancelInternal(cause)

final override fun cancel(cause: CancellationException?) {
if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
/*
* Do not create an exception if channel is already cancelled.
* Channel is closed for receive when either it is cancelled (then we are free to bail out)
* or was closed and elements were received.
* Then `onCancelIdempotent` does nothing for all implementations.
*/
if (isClosedForReceive) return
cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels
Expand All @@ -26,7 +26,7 @@ internal open class ChannelCoroutine<E>(
}

final override fun cancel(cause: CancellationException?) {
if (isClosedForReceive) return // Do not create an exception if channel is already cancelled
if (isCancelled) return // Do not create an exception if the coroutine (-> the channel) is already cancelled
cancelInternal(cause ?: defaultCancellationException())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.atomicfu.*
Expand Down Expand Up @@ -115,4 +119,4 @@ class ChannelUndeliveredElementTest : TestBase() {
check(!_cancelled.getAndSet(true)) { "Already cancelled" }
}
}
}
}
23 changes: 22 additions & 1 deletion kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels
Expand Down Expand Up @@ -95,6 +95,27 @@ class ProduceTest : TestBase() {
cancelOnCompletion(coroutineContext)
}

@Test
fun testCancelWhenTheChannelIsClosed() = runTest {
val channel = produce<Int> {
send(1)
close()
expect(2)
launch {
expect(3)
hang { expect(5) }
}
}

expect(1)
channel.receive()
yield()
expect(4)
channel.cancel()
(channel as Job).join()
finish(6)
}

@Test
fun testAwaitConsumerCancellation() = runTest {
val parent = Job()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow
Expand Down Expand Up @@ -194,4 +194,17 @@ class ChannelFlowTest : TestBase() {
assertEquals(listOf(1), flow.toList())
finish(3)
}

@Test
fun testCancelledOnCompletion() = runTest {
val myFlow = callbackFlow<Any> {
expect(2)
close()
hang { expect(3) }
}

expect(1)
myFlow.collect()
finish(4)
}
}

0 comments on commit 7061cc2

Please sign in to comment.