Skip to content

Commit

Permalink
Initial low level implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincianfarini committed Feb 18, 2020
1 parent 7319bc5 commit 9a25862
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
18 changes: 18 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ internal class ChannelFlowOperatorImpl<T>(
flow.collect(collector)
}

internal class DiscardFlowOperator<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext
) : ChannelFlowOperator<T, T>(flow, context, capacity = Channel.RENDEZVOUS) {

override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> {
return DiscardFlowOperator(flow = flow, context = context)
}

override suspend fun collectTo(scope: ProducerScope<T>) {
flowCollect(DiscardingCollector(scope))
}

override suspend fun flowCollect(collector: FlowCollector<T>) {
flow.collect(collector)
}
}

// Now if the underlying collector was accepting concurrent emits, then this one is too
// todo: we might need to generalize this pattern for "thread-safe" operators that can fuse with channels
private fun <T> FlowCollector<T>.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector<T> = when (this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.FlowCollector

/**
* A [FlowCollector] specifically built to be used for `Flow.dropWhileBusy`
*
* This collector ensures that the fist element is always sent. It then offers subsequent elements,
* allowing them to be dropped.
*/
@InternalCoroutinesApi
internal class DiscardingCollector<T>(private val channel: SendChannel<T>) : FlowCollector<T> {

private var hasSentFirstValue: Boolean = false

override suspend fun emit(value: T) {
if (hasSentFirstValue) {
channel.offer(value)
} else {
channel.send(value)
hasSentFirstValue = true
}
}
}
8 changes: 2 additions & 6 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
* assertEquals(listOf(1, 11, 21), result)
* ```
*
* Not that the `dropWhileBusy` operator is extremely similar to [conflate]. However, the difference
* Note that the `dropWhileBusy` operator is extremely similar to [conflate]. However, the difference
* is that [conflate] always emits the latest elements, whereas `dropWhileBusy` doesn't. `dropWhileBusy`
* explicity only emits elements given that a collector is currently suspended and able to accept
* another element.
Expand All @@ -181,11 +181,7 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
* **Conflation takes precedence over `buffer()` calls with any other capacity.**
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.dropWhileBusy(): Flow<T> = channelFlow {
collectIndexed { index, value ->
if (index == 0) send(value) else offer(value)
}
}.buffer(capacity = 0)
public fun <T> Flow<T>.dropWhileBusy(): Flow<T> = DiscardFlowOperator(this)

/**
* Changes the context where this flow is executed to the given [context].
Expand Down

0 comments on commit 9a25862

Please sign in to comment.