Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce missing backpressure drop Flow operator #1798

Closed
wants to merge 0 commits into from

Conversation

kevincianfarini
Copy link
Contributor

@kevincianfarini kevincianfarini commented Feb 12, 2020

This is a PR to introduce the lacking RX Java onBackPressureDrop() Flow analogue.

drop diagram

Although I would have liked to call this operator drop I think that name is both ambiguous and already taken. Therefore, I've named it discard. I'm open to name suggestions.

@elizarov
Copy link
Contributor

But what is the use-case? Why and in what cases would you need it?

@kevincianfarini
Copy link
Contributor Author

The use case I can see is that this is the inverse of debounce that had a dynamic timeout period based on the speed of the consumer.

Given 4 rapid clicks of a button, you could:

  • debounce with a given timeout. At best, you debounce three of the clicks based on timeout and consume the last element. At worst, the button is clicked in such a way that multiple events are queued up.
  • discard extra button clicks. Given the scenario that the button consumer speed varies, setting a static timeout using debounce isn't much help and would be useful to base element drops on the consumer speed.

@elizarov
Copy link
Contributor

elizarov commented Feb 12, 2020

@kevincianfarini It looks like your use-case of filtering rapid button clicks is already covered by conflate() operator. Please, take a look: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html

@kevincianfarini
Copy link
Contributor Author

Wouldn't the conflate operator start by processing the first click, and then conflating the remaining three? Then, after the consumer is done processing the first click it would then process the fourth as well.

Am I misunderstanding how conflate works?

@elizarov
Copy link
Contributor

Wouldn't the conflate operator start by processing the first click, and then conflating the remaining three? Then, after the consumer is done processing the first click it would then process the fourth as well.

Yes. As far as can see exactly the same behavior is shown on the picture for onBackpressureDrop, isn't it?

@kevincianfarini
Copy link
Contributor Author

kevincianfarini commented Feb 12, 2020

@elizarov

Yes. As far as can see exactly the same behavior is shown on the picture for onBackpressureDrop, isn't it?

I don't think so. I tried to illustrate this scenario similar to how the RX docs do.

diagram

In this example, discard only processes the first click because all of the other clicks happen during the consumption duration.

Conflate, however, will consume both elements 1 and 4 because the first element has already been emitted and is being consumed. When the consumer is ready for the next element, button click 4 will be emitted since it is cached.

@elizarov
Copy link
Contributor

Thanks. I see the difference now. Borrowing example code from conflate docs:

val flow = (1..30).asFlow().onEach { delay(100) }
println(flow.conflate().onEach { delay(1000) }.toList())

Prints: [1, 10, 20, 30], while:

println(flow.asFlowable().onBackpressureDrop().asFlow().onEach { delay(1000) }.toList())

Prints: [1, 11, 21]

Now, the question is: what are the use-cases for onBackpressureDrop? In what circumstances you'd prefer its behavior to conflate()?

@kevincianfarini
Copy link
Contributor Author

The first practical use case that comes to mind for me is a notification channel.

It's pretty common that notifications sent via websockets, google play services, etc don't have a payload of the notification itself. Instead they indicate the client that updates are available and to make a call to get them.

Using discard on a flow which represents a notification channel would be more correct than using conflate, since the client wouldn't finish processing the first notification update, and then process another conflated update request which was received while the client was processing the first.

Copy link
Contributor

@elizarov elizarov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've discussed this at length at a design meeting and the consensus name we came with for this operator is dropWhenBusy().

As for implementation in this PR, there are several changes that need to be made:

  • First of all, channelFlow { collect { offer(it) } } gives no guarantee that the first element will not be dropped. The producer coroutine that calls collect { offer(it) } may get started before the consumer receiving from the corresponding channel is up and running and thus it will not be able to offer the very first element.
    • A stress test needs to be added that exposes this problem as of now.
    • Code shall be modified to make sure this problem does not repeat.
  • Since this operator is expected to be popular in UI app, its implementation should be as low-level and as efficient as possible. Instead of using channelFlow, I'd suggest to write a custom implementation of ChannelFlow class for this operator (similarly to ChannelFlowBuilder).
  • Documentation of dropWhenBusy shall be structured similarly to documentation on buffer and conflate and the latter operators' docs shall add a reference to dropWhenBusy. Special attention must be paid to documenting the difference between conflate and dropWhenBusy -- the corresponding explanation shall be added to both docs.
  • Migration.kt shall add a deprecated extension for onBackpressureDrop similarly to the way it is done for other Rx operator names.

@kevincianfarini
Copy link
Contributor Author

@elizarov awesome! I'll rework this soon. Thanks

@kevincianfarini
Copy link
Contributor Author

I'm working and pushing updates now. Since we're in different time zones, when this gets approved could we wait to merge so I have a chance to squash all of the commits?

@kevincianfarini kevincianfarini force-pushed the master branch 3 times, most recently from 0860603 to 9a25862 Compare February 18, 2020 01:28
* **Conflation takes precedence over `buffer()` calls with any other capacity.**
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.dropWhileBusy(): Flow<T> = DiscardFlowOperator(this)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm questioning if this operator should be fusable with conflate/buffer, flowOn, etc.

As far as I'm aware, they all use SendingCollector to send elements to a channel. This implementation of a flow operator uses its own implementation of a flow collector, DiscardingFlowCollector. I'm not quite sure how that could be fused with the others.

Maybe I'm over thinking this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up to this:

What does it mean for us to fuse this operator with others? It seems like we could fuse this with buffer. Then, when the buffer of this channel gets full we would drop elements after.

What would it mean for this to get fused with conflate? Seems like it shouldn't be a valid operation. Thoughts?

Copy link
Contributor Author

@kevincianfarini kevincianfarini Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow, follow up.

I'm implemented operator fusion with buffer. It seems to make logical sense.

I throw an IllegalArgumentException if conflate is called downstream of dropWhileBusy because it doesn't seem like a valid operation. This has been documented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "fusing" is a performance optimization. It is totally optional. The goal of the fusing is that the combined code behaves as it did before (or almost like before), but underlying implementation is more efficient. The reason that fusing exists is that creating separate coroutines with channels in between them is expensive and it is great if we can avoid them without affecting the semantics of the respective operators.

To see what should happen with fusing you simply consider how a chain of operators effectively behaves without fusing:

  1. buffer(x).dropWhileBusy() chain
    Any kind of buffering or conflation before dropWhileBusy is irrelevant, since dropWhileBusy is always ready to accept an element (and drops it), so this should fuse to dropWhileBusy().
  2. dropWhileBuys().conflate() chain
    This is legit too. conflate() is never busy so dropWhileBusy() that is placed before is irrelevant. It should fuse to the conflate(). In particular, it must not throw an exception, because that would leak operator implementation details to downstream consumer flows.
  3. dropWhileBusy().buffer(x >= 0) chain
    Makes total sense. It would drop only when the buffer is full.

How to implement the last one? What it shall fuse to? The solution I'd suggest to implement is this:

Don't create a separate DiscardFlowOperator. Instead, add a boolean dropWhileBusy flag to an existing ChannelFlowOperatorImpl. By default, dropWhileBusy will create an implementation with capacity = 0, dropWhileBusy = true, while other operators will work like before, creating an implementation with dropWhileBusy = false.

The fusing logic shall behave according to the rationale that is outlined above. There are two implementation strategies here:

The minimal strategy

The minimal strategy is to conflated dropWhileBusy only with ChannelFlowOperatorImpl. So, when dropWhileBusy is applied to any ChannelFlowOperatorImpl it "unwraps" it completely, since it effectively negates any previously configured buffering (item 1 above). The other what around, when ChannelFlowOperatorImpl with dropWhileBusy=true and is target to buffer(...) operator, it fuses the buffer size as it does now (see ChanneFlow.update function) and keeps its dropWhileBusy=true flag (item 3 above). On the other hang, conflate() operator should clear dropWhileBusy in the preceding ChannelFlowOperatorImpl (item 2).

The full strategy

What's minimal about the minimal srtategy? Consider the following code:

channelFlow { // this: ProduceScope
    repeat(100) { send(it) } // ProducerScope.send is called
}.dropWhileBusy().collect { println(it) }

To collect this flow, the minimal strategy would setup 3 coroutines and two channels between them. The first one doing send(it) to the buffered channel, then the second one receives from it and offers to the rendezvous channel, the third one prints. This is not efficient.

It would be more efficient to directly offer into the rendezvous channel and use only two coroutines. That is, it would be more efficient if dropWhileBusy could fuse with the preceding ChannelFlowBuilder. But that means that ChannelFlowBuilder has to provide to its the block the special implementation of ProducerScope interface that makes send to behave like offer.

But where does ProducerScope comes from? Take a look at ChannelFlow.produceImpl/broadcastImpl. It comes from produce { ... } and broadcast { ... } builder. So, in order to support fully fusable dropWhileBusy both of those builders need to support "dropWhileBusy" mode. There's no reason to do that in their public api, but there can be an internal version that takes an additiona dropWhileBusy boolean parameter, passes it to the created ProducerCoroutine/BroadcastCoroutine and overrides send implementation in those classes so that it calls offer if dropWhileBusy is true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimal strategy seems simple and to make sense. I do have concerns about the full strategy though.

It would be more efficient to directly offer into the rendezvous channel and use only two coroutines. That is, it would be more efficient if dropWhileBusy could fuse with the preceding ChannelFlowBuilder. But that means that ChannelFlowBuilder has to provide to its the block the special implementation of ProducerScope interface that makes send to behave like offer.

Seeing that ProducerScope has both offer and send, changing the semantics of send when dropWhileBusy is fused seems alarming. Send could secretly be an offer in the right instance, which seems sketchy to me.

Thoughts? If we were to go this route, I would want it to be thoroughly documented on both produce/broadcast as well as dropWhileBusy

Copy link
Contributor Author

@kevincianfarini kevincianfarini Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elizarov Likewise, the overridden send (which is really offer) would need to ensure first element delivery. Should we apply that same constraint when a user calls offer within the scope?


private var hasSentFirstValue: Boolean = false

override suspend fun emit(value: T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, channelFlow { collect { offer(it) } } gives no guarantee that the first element will not be dropped. The producer coroutine that calls collect { offer(it) } may get started before the consumer receiving from the corresponding channel is up and running and thus it will not be able to offer the very first element.

The idea I'm striving here is that we ensure the first element delivery via send and offer all others.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea looks good to me. It seems very appropriate for the way collection from the flow conceptually works, where you write flow.collect { value -> someCode(value) } and someCode is called (and can become busy) only when the first element is there. However, it can be surprising for cases like flow.onStart { delay(1000) }.collect { ... } where it would still wait for the actual collection to start and would not consider the flow to be "busy" for this initial 1000 ms. So, it should be explicitly mentioned in the docs of dropWhileBusy, that is give there a more detailed explanation of what "busy" means for this operator.

@kevincianfarini
Copy link
Contributor Author

kevincianfarini commented Feb 27, 2020

@elizarov I've made some more updates and left review comments that I'd like your guidance on before I write some stress tests and make the migration deprecation. :)

@@ -150,6 +150,51 @@ public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> {
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

/**
* Discards flow emissions using [Channel.offer]. When applied, a producer will never suspend
* due to a slow collector. In the event that an element is emitted while a collector is busy, the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc assumes that the reader knows what Channel.offer does. I'd suggest to combine 1st & 2nd sentence and remove reference to channels, to make it so that the resulting first sentence fully describes what this operator does.

* ```
*
* Applying the `dropWhileBusy` operator allows a collector which takes 1 second to process each
* element to reieve integers [1, 11, 21]
Copy link
Contributor

@elizarov elizarov Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in reieve. More typos down there is the text, too. Btw, I highly recommend this plugin https://plugins.jetbrains.com/plugin/12175-grazie

* **Conflation takes precedence over `buffer()` calls with any other capacity.**
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.dropWhileBusy(): Flow<T> = DiscardFlowOperator(this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "fusing" is a performance optimization. It is totally optional. The goal of the fusing is that the combined code behaves as it did before (or almost like before), but underlying implementation is more efficient. The reason that fusing exists is that creating separate coroutines with channels in between them is expensive and it is great if we can avoid them without affecting the semantics of the respective operators.

To see what should happen with fusing you simply consider how a chain of operators effectively behaves without fusing:

  1. buffer(x).dropWhileBusy() chain
    Any kind of buffering or conflation before dropWhileBusy is irrelevant, since dropWhileBusy is always ready to accept an element (and drops it), so this should fuse to dropWhileBusy().
  2. dropWhileBuys().conflate() chain
    This is legit too. conflate() is never busy so dropWhileBusy() that is placed before is irrelevant. It should fuse to the conflate(). In particular, it must not throw an exception, because that would leak operator implementation details to downstream consumer flows.
  3. dropWhileBusy().buffer(x >= 0) chain
    Makes total sense. It would drop only when the buffer is full.

How to implement the last one? What it shall fuse to? The solution I'd suggest to implement is this:

Don't create a separate DiscardFlowOperator. Instead, add a boolean dropWhileBusy flag to an existing ChannelFlowOperatorImpl. By default, dropWhileBusy will create an implementation with capacity = 0, dropWhileBusy = true, while other operators will work like before, creating an implementation with dropWhileBusy = false.

The fusing logic shall behave according to the rationale that is outlined above. There are two implementation strategies here:

The minimal strategy

The minimal strategy is to conflated dropWhileBusy only with ChannelFlowOperatorImpl. So, when dropWhileBusy is applied to any ChannelFlowOperatorImpl it "unwraps" it completely, since it effectively negates any previously configured buffering (item 1 above). The other what around, when ChannelFlowOperatorImpl with dropWhileBusy=true and is target to buffer(...) operator, it fuses the buffer size as it does now (see ChanneFlow.update function) and keeps its dropWhileBusy=true flag (item 3 above). On the other hang, conflate() operator should clear dropWhileBusy in the preceding ChannelFlowOperatorImpl (item 2).

The full strategy

What's minimal about the minimal srtategy? Consider the following code:

channelFlow { // this: ProduceScope
    repeat(100) { send(it) } // ProducerScope.send is called
}.dropWhileBusy().collect { println(it) }

To collect this flow, the minimal strategy would setup 3 coroutines and two channels between them. The first one doing send(it) to the buffered channel, then the second one receives from it and offers to the rendezvous channel, the third one prints. This is not efficient.

It would be more efficient to directly offer into the rendezvous channel and use only two coroutines. That is, it would be more efficient if dropWhileBusy could fuse with the preceding ChannelFlowBuilder. But that means that ChannelFlowBuilder has to provide to its the block the special implementation of ProducerScope interface that makes send to behave like offer.

But where does ProducerScope comes from? Take a look at ChannelFlow.produceImpl/broadcastImpl. It comes from produce { ... } and broadcast { ... } builder. So, in order to support fully fusable dropWhileBusy both of those builders need to support "dropWhileBusy" mode. There's no reason to do that in their public api, but there can be an internal version that takes an additiona dropWhileBusy boolean parameter, passes it to the created ProducerCoroutine/BroadcastCoroutine and overrides send implementation in those classes so that it calls offer if dropWhileBusy is true.


private var hasSentFirstValue: Boolean = false

override suspend fun emit(value: T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea looks good to me. It seems very appropriate for the way collection from the flow conceptually works, where you write flow.collect { value -> someCode(value) } and someCode is called (and can become busy) only when the first element is there. However, it can be surprising for cases like flow.onStart { delay(1000) }.collect { ... } where it would still wait for the actual collection to start and would not consider the flow to be "busy" for this initial 1000 ms. So, it should be explicitly mentioned in the docs of dropWhileBusy, that is give there a more detailed explanation of what "busy" means for this operator.

kotlinx-coroutines-core/common/src/channels/Produce.kt Outdated Show resolved Hide resolved

private var shouldDropWhileBusy: Boolean = false

internal open fun update(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to have a separate 3-argument update. You can just 3rd parameter to a regular update for simplicity. It is an internal method, so you don't have to preserve it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other update fun is not internal, it's public. Should I still coalesce these to a single method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is public in an internal class. It can be safely updated (put intented).

return ChannelFlowOperatorImpl(
flow,
context,
dropWhileBusy = capacity != Channel.CONFLATED && dropWhileBusy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code properly belongs to update function. That is where all trickly "fusion" of parameters is performance. Here is should simply create implementation with the given params. To get there, create function shall have dropWhileBusy parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to do that, should I overload the create function or change the signature in ChannelFlowOperator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just add dropWhileBusy boolean to create. It is all internal code that is not directly available to end-users of API.

Copy link
Contributor

@elizarov elizarov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general note is that we need more tests for all the possible fusion combinations of dropWhileBusy and buffer.


private var shouldDropWhileBusy: Boolean = false

internal open fun update(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is public in an internal class. It can be safely updated (put intented).

return ChannelFlowOperatorImpl(
flow,
context,
dropWhileBusy = capacity != Channel.CONFLATED && dropWhileBusy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just add dropWhileBusy boolean to create. It is all internal code that is not directly available to end-users of API.

@@ -76,8 +85,11 @@ public abstract class ChannelFlow<T>(
* handlers, while the pipeline before does not, because it was cancelled during its dispatch.
* Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
*/
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> = if (shouldDropWhileBusy) {
scope.dropWhileBusyProduce(context, start = CoroutineStart.ATOMIC, block = collectToFun)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

produceCapacity parameter is missing in the call to dropWhileBusyProduce. It should be affecting the chain of operators like dropWhileBusy().buffer(10) where, logically, elements shall be dropped when the buffer is full.

class DropWhileBusyTest : TestBase() {

@Test
fun `elements are dropped during slow consumption`() = withVirtualTime {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use back-quotes test names in this project. Please use testXxx naming convention in all the tests here. Also, the tests here would be easier to read and review (less scrolling to do) without all the empty lines inside the corresponding test functions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants