Skip to content

Commit

Permalink
Increase the deprecation levels for BroadcastChannel APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Jul 29, 2024
1 parent ab279a7 commit 60a674c
Show file tree
Hide file tree
Showing 19 changed files with 46 additions and 181 deletions.
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow (Lkotlin/ranges/IntRange;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/ranges/LongRange;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/sequences/Sequence;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlinx/coroutines/channels/BroadcastChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
86 changes: 5 additions & 81 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:Suppress("DEPRECATION")
@file:Suppress("DEPRECATION", "DEPRECATION_ERROR")

package kotlinx.coroutines.channels

Expand All @@ -10,36 +10,10 @@ import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Broadcasts all elements of the channel.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses [BroadcastChannel] with a buffer of given capacity,
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* otherwise -- throws [IllegalArgumentException].
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
* but it is not as prompt.
*
* ### Future replacement
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways.
*
* **Note: This API is obsolete since 1.5.0.** It is deprecated with warning in 1.7.0.
* It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
Expand All @@ -56,60 +30,10 @@ public fun <E> ReceiveChannel<E>.broadcast(
}

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
*
* The scope of the coroutine contains [ProducerScope] interface, which implements
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [context] element.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* - when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses [BroadcastChannel] with a buffer of given capacity,
* - when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* - otherwise -- throws [IllegalArgumentException].
*
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
* This ensures that the first subscriber does not miss any sent elements.
* However, later subscribers may miss elements.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
* coroutine is doing something else.
*
* ### Future replacement
*
* This API is obsolete since 1.5.0 and deprecated with warning since 1.7.0.
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways.
* It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
Expand Down
76 changes: 14 additions & 62 deletions kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:Suppress("FunctionName", "DEPRECATION")
@file:Suppress("FunctionName", "DEPRECATION", "DEPRECATION_ERROR")

package kotlinx.coroutines.channels

Expand All @@ -10,62 +10,35 @@ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.native.concurrent.*

/**
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
* that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel]
* function.
*
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
* **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
* It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public interface BroadcastChannel<E> : SendChannel<E> {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
* broadcast channel.
* @suppress
*/
public fun openSubscription(): ReceiveChannel<E>

/**
* Cancels reception of remaining elements from this channel with an optional cause.
* This function closes the channel with
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* @suppress
*/
public fun cancel(cause: CancellationException? = null)

/**
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
* @suppress
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
public fun cancel(cause: Throwable? = null): Boolean
}

/**
* Creates a broadcast channel with the specified buffer capacity.
*
* The resulting channel type depends on the specified [capacity] parameter:
*
* - when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity.
* **Note:** this channel looses all items that have been sent to it until the first subscriber appears;
* - when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
* - when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
* - otherwise -- throws [IllegalArgumentException].
*
* **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
* It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow] and [StateFlow][kotlinx.coroutines.flow.StateFlow].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported")
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
Expand All @@ -76,49 +49,28 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
}

/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
*
* Back-to-send sent elements are _conflated_ -- only the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
* Sender to this broadcast channel never suspends and [trySend] always succeeds.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
*
* In this implementation, [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription
* takes linear time in the number of subscribers.
*
* **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
* It is replaced with [SharedFlow][kotlinx.coroutines.flow.StateFlow].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public class ConflatedBroadcastChannel<E> private constructor(
private val broadcast: BroadcastChannelImpl<E>
) : BroadcastChannel<E> by broadcast {
public constructor(): this(BroadcastChannelImpl<E>(capacity = CONFLATED))
/**
* Creates an instance of this class that already holds a value.
*
* It is as a shortcut to creating an instance with a default constructor and
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
* @suppress
*/
public constructor(value: E) : this() {
trySend(value)
}

/**
* The most recently sent element to this channel.
*
* Access to this property throws [IllegalStateException] when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close] without a cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
* @suppress
*/
public val value: E get() = broadcast.value

/**
* The most recently sent element to this channel or `null` when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close].
* @suppress
*/
public val valueOrNull: E? get() = broadcast.valueOrNull
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Deprecated.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import kotlin.jvm.*
* Safe to remove in 1.9.0 as was inline before.
*/
@ObsoleteCoroutinesApi
@Suppress("DEPRECATION")
@Suppress("DEPRECATION_ERROR")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
val channel = openSubscription()
Expand Down
19 changes: 0 additions & 19 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,6 @@ private class ChannelAsFlow<T>(
override fun additionalToStringProps(): String = "channel=$channel"
}

/**
* Represents the given broadcast channel as a hot flow.
* Every flow collector will trigger a new broadcast channel subscription.
*
* ### Cancellation semantics
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, subscription is cancelled.
*/
@Suppress("DEPRECATION")
@Deprecated(
level = DeprecationLevel.ERROR,
message = "'BroadcastChannel' is obsolete and all corresponding operators are deprecated " +
"in the favour of StateFlow and SharedFlow"
) // Since 1.5.0, ERROR since 1.7.0, was @FlowPreview, safe to remove in 1.8.0
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
emitAll(openSubscription())
}

/**
* Creates a [produce] coroutine that collects the given flow.
*
Expand Down
15 changes: 8 additions & 7 deletions kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class CancelledParentAttachTest : TestBase() {

@Test
fun testAsync() = runTest {
CoroutineStart.values().forEach { testAsyncCancelledParent(it) }
CoroutineStart.entries.forEach { testAsyncCancelledParent(it) }
}

private suspend fun testAsyncCancelledParent(start: CoroutineStart) {
Expand All @@ -25,14 +25,14 @@ class CancelledParentAttachTest : TestBase() {
}
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}

@Test
fun testLaunch() = runTest {
CoroutineStart.values().forEach { testLaunchCancelledParent(it) }
CoroutineStart.entries.forEach { testLaunchCancelledParent(it) }
}

private suspend fun testLaunchCancelledParent(start: CoroutineStart) {
Expand All @@ -48,7 +48,7 @@ class CancelledParentAttachTest : TestBase() {
}
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}
Expand All @@ -67,9 +67,10 @@ class CancelledParentAttachTest : TestBase() {

@Test
fun testBroadcast() = runTest {
CoroutineStart.values().forEach { testBroadcastCancelledParent(it) }
CoroutineStart.entries.forEach { testBroadcastCancelledParent(it) }
}

@Suppress("DEPRECATION_ERROR")
private suspend fun testBroadcastCancelledParent(start: CoroutineStart) {
try {
withContext(Job()) {
Expand All @@ -83,7 +84,7 @@ class CancelledParentAttachTest : TestBase() {
}
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}
Expand All @@ -105,7 +106,7 @@ class CancelledParentAttachTest : TestBase() {
block()
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/test/ParentCancellationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package kotlinx.coroutines

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.test.*

/**
Expand Down Expand Up @@ -57,6 +56,7 @@ class ParentCancellationTest : TestBase() {
}

@Test
@Suppress("DEPRECATION_ERROR")
fun testBroadcastChild() = runTest {
testParentCancellation(runsInScopeContext = true) { fail ->
broadcast<Unit> { fail() }.openSubscription()
Expand Down Expand Up @@ -165,4 +165,4 @@ class ParentCancellationTest : TestBase() {
}
finish(3)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package kotlinx.coroutines.channels

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import kotlin.test.*


@Suppress("DEPRECATION_ERROR")
class BroadcastChannelFactoryTest : TestBase() {

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:Suppress("DEPRECATION")
@file:Suppress("DEPRECATION_ERROR")

package kotlinx.coroutines.channels

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import kotlin.test.*

@Suppress("DEPRECATION_ERROR")
class BufferedBroadcastChannelTest : TestBase() {

@Test
Expand Down
Loading

0 comments on commit 60a674c

Please sign in to comment.