Skip to content

Commit

Permalink
Introduce missing backpressure drop Flow operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kevincianfarini committed Apr 18, 2020
1 parent 2ff0bbb commit e0eab61
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 11 deletions.
28 changes: 28 additions & 0 deletions kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.channels

import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlin.coroutines.*

Expand Down Expand Up @@ -99,6 +100,17 @@ public fun <E> CoroutineScope.produce(
return coroutine
}

internal fun <E> CoroutineScope.dropWhileBusyProduce(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(Channel.RENDEZVOUS)
val newContext = newCoroutineContext(context)
val coroutine = DropWhileBusyProducerCoroutine(newContext, channel)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine
}

/**
* **This is an internal API and should not be used from general code.**
* The `onCompletion` parameter will be redesigned.
Expand Down Expand Up @@ -142,3 +154,19 @@ internal open class ProducerCoroutine<E>(
if (!processed && !handled) handleCoroutineException(context, cause)
}
}

private class DropWhileBusyProducerCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>
) : ProducerCoroutine<E>(parentContext, channel) {

private val hasSentFirstValue = atomic(false)

override suspend fun send(element: E) {
if (hasSentFirstValue.getAndSet(true)) {
_channel.offer(element)
} else {
_channel.send(element)
}
}
}
43 changes: 36 additions & 7 deletions kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public abstract class ChannelFlow<T>(
// upstream context
@JvmField val context: CoroutineContext,
// buffer capacity between upstream and downstream context
@JvmField val capacity: Int
@JvmField val capacity: Int,
private var shouldDropWhileBusy: Boolean = false
) : Flow<T> {

// shared code to create a suspend lambda from collectTo function in one place
Expand All @@ -35,6 +36,14 @@ public abstract class ChannelFlow<T>(
private val produceCapacity: Int
get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity

internal open fun update(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL,
dropWhileBusy: Boolean = shouldDropWhileBusy
): ChannelFlow<T> = update(context, capacity).apply {
shouldDropWhileBusy = dropWhileBusy
}

public fun update(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
Expand Down Expand Up @@ -76,8 +85,11 @@ public abstract class ChannelFlow<T>(
* handlers, while the pipeline before does not, because it was cancelled during its dispatch.
* Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
*/
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =if (shouldDropWhileBusy) {
scope.dropWhileBusyProduce(context, block = collectToFun)
} else {
scope.produce(context, produceCapacity, start = CoroutineStart.ATOMIC, block = collectToFun)
}

override suspend fun collect(collector: FlowCollector<T>) =
coroutineScope {
Expand Down Expand Up @@ -135,13 +147,30 @@ internal abstract class ChannelFlowOperator<S, T>(
internal class ChannelFlowOperatorImpl<T>(
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
capacity: Int = Channel.OPTIONAL_CHANNEL,
val dropWhileBusy: Boolean = false
) : ChannelFlowOperator<T, T>(flow, context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
ChannelFlowOperatorImpl(flow, context, capacity)

override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> {
return ChannelFlowOperatorImpl(
flow,
context,
dropWhileBusy = capacity != Channel.CONFLATED && dropWhileBusy,
capacity = capacity
)
}

override suspend fun collectTo(scope: ProducerScope<T>) {
if (dropWhileBusy) flowCollect(DropWhileBusyCollector(scope)) else super.collectTo(scope)
}

override suspend fun flowCollect(collector: FlowCollector<T>) = flow.collect(collector)

internal override fun update(
context: CoroutineContext,
capacity: Int,
dropWhileBusy: Boolean
) = ChannelFlowOperatorImpl(flow, context, capacity, dropWhileBusy)
}

// Now if the underlying collector was accepting concurrent emits, then this one is too
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.FlowCollector

/**
* A [FlowCollector] specifically built to be used for `Flow.dropWhileBusy`
*
* This collector ensures that the fist element is always sent. It then offers subsequent elements,
* allowing them to be dropped.
*/
@InternalCoroutinesApi
internal class DropWhileBusyCollector<T>(private val channel: SendChannel<T>) : FlowCollector<T> {

private var hasSentFirstValue: Boolean = false

override suspend fun emit(value: T) {
if (hasSentFirstValue) {
channel.offer(value)
} else {
channel.send(value)
hasSentFirstValue = true
}
}
}
93 changes: 89 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,15 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> {
require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
}
return if (this is ChannelFlow)
update(capacity = capacity)
else
ChannelFlowOperatorImpl(this, capacity = capacity)

return when (this) {
is ChannelFlowOperatorImpl -> update(
capacity = capacity,
dropWhileBusy = if (capacity == CONFLATED) false else dropWhileBusy
)
is ChannelFlow -> update(capacity = capacity)
else -> ChannelFlowOperatorImpl(this, capacity = capacity)
}
}

/**
Expand Down Expand Up @@ -150,6 +155,86 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> {
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

/**
* When applied, a producer will never suspend due to a slow consumer. In the event that an element
* is emitted while a collector is busy, the emitted element is dropped.
*
* For example, consider a flow that emits integers 1 to 30 with a 100ms delay.
*
* ```
* val flow = (1..30).asFlow().onEach { delay(100) }
* ```
*
* Applying the `dropWhileBusy` operator allows a collector which takes 1 second to process each
* element to receive integers [1, 11, 21]
*
* ```
* val result = flow.dropWhileBusy().onEach { delay(1000) }.tolist()
* assertEquals(listOf(1, 11, 21), result)
* ```
*
* Note that the `dropWhileBusy` operator is extremely similar to [conflate]. However, the difference
* is that [conflate] always emits the latest elements, whereas `dropWhileBusy` doesn't. `dropWhileBusy`
* explicity only emits elements given that a collector is currently suspended and able to accept
* another element.
*
* ### Operator Fusion
*
* Adjacent applications of [buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are
* always fused so that only one properly configured channel is used for execution.
*
* #### Buffer fusions
* Applying [Flow.buffer] upstream of `dropWhileBusy` is a no-op. For example:
*
* ```
* val flow = (1..30).asFlow().buffer(4).dropWhileBusy().onEach { ... }
* ```
* is the same as
* ```
* val flow = (1..30).asFlow().dropWhileBusy().onEach { ... }
* ```
*
* However, it is possible to buffer a flow downsteam of `dropWhileBusy` such that elements are buffered
* until the channels capacity is reached. Only after that happens are elements dropped.
*
* For example:
* ```
* val list = (1..30).asFlow().onEach { delay(100) }.dropWhileBusy().buffer(1).onEach { delay(1000) }.toList()
* ```
* would produce the list [1, 2, 11, 21]
*
* #### Conflate fusions
* Applying [Flow.conflate] downstream of `dropWhileBusy` is the same as just calling conflate.
* ```
* flow.dropWhileBusy().conflate()
* ```
* is the same as
* ```
* flow.conflate()
* ```
*
* The opposite also holds true -- applying `dropWhileBusy` downstream of [Flow.conflate] is the same
* as just calling `dropWhileBusy`
* ```
* flow.conflate().dropWhileBusy()
* ```
* is the same as
* ```
* flow.dropWhileBusy()
* ```
*
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.dropWhileBusy(): Flow<T> = when (this) {
is ChannelFlowOperatorImpl -> update(capacity = 0, dropWhileBusy = true)
is ChannelFlow -> update(dropWhileBusy = true)
else -> ChannelFlowOperatorImpl(
flow = this,
dropWhileBusy = true,
capacity = 0
)
}

/**
* Changes the context where this flow is executed to the given [context].
* This operator is composable and affects only preceding operators that do not have its own context.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package kotlinx.coroutines.flow.operators

import kotlinx.coroutines.TestBase
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.withVirtualTime
import kotlin.test.Test
import kotlin.test.assertEquals

class DropWhileBusyTest : TestBase() {

@Test
fun `elements are dropped during slow consumption`() = withVirtualTime {
expect(1)

val flow = (1..30).asFlow().onEach { delay(100) }

val result = flow.dropWhileBusy().onEach { delay(1000) }.toList()

assertEquals(listOf(1, 11, 21), result)

finish(2)
}

@Test
fun `buffering a flow upstream of dropWhileBusy fuses to dropWhileBusy`() = withVirtualTime {
expect(1)

val flow = (1..30).asFlow().onEach { delay(100) }.buffer(4)

val result = flow.dropWhileBusy().onEach { delay(1000) }.toList()

assertEquals(listOf(1, 11, 21), result)

finish(2)
}

@Test
fun `buffering a flow downstream of dropWhileBusy buffers elements`() = withVirtualTime {
expect(1)

val flow = (1..30).asFlow().onEach { delay(100) }
val result = flow.dropWhileBusy().buffer(capacity = 1).onEach { delay(1000) }.toList()

assertEquals(listOf(1, 2, 11, 21), result)

finish(2)
}

@Test
fun `conflating a flow downstream of dropWhileBusy fuses to conflate`() = withVirtualTime {
expect(1)

val flow = (1..30).asFlow().onEach { delay(100) }
val result = flow.dropWhileBusy().conflate().onEach { delay(1000) }.toList()

assertEquals(listOf(1, 10, 20, 30), result)

finish(2)
}

@Test
fun `conflating a flow upstream of dropWhileBusy fuses to dropWhileBusy`() = withVirtualTime {
expect(1)

val flow = (1..30).asFlow().onEach { delay(100) }
val result = flow.conflate().dropWhileBusy().onEach { delay(1000) }.toList()

assertEquals(listOf(1, 11, 21), result)

finish(2)
}

@Test
fun `dropWhileBusy can fuse with a normal produce ChannelFlow`() = withVirtualTime {
expect(1)

val flow = channelFlow {
(1..30).forEach {
send(it)
delay(100)
}
}
val result = flow.dropWhileBusy().onEach { delay(1000) }.toList()

assertEquals(listOf(1, 11, 21), result)

finish(2)
}
}

0 comments on commit e0eab61

Please sign in to comment.