Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to window the fixed-size buffered items in Channel? #1750

Open
diousk opened this issue Jan 4, 2020 · 5 comments
Open

How to window the fixed-size buffered items in Channel? #1750

diousk opened this issue Jan 4, 2020 · 5 comments

Comments

@diousk
Copy link

diousk commented Jan 4, 2020

When using Channel with capacity > 0, like Channel(3)
I want to keep the buffered item inside the channel as fresh as possible,
The use-case is like following:

fun bufferChannel() {
        val channel = Channel<Int>(3)
        scope.launch {
            launch {
                (1..10).forEach {
                    println("send $it")
                    channel.send(it)
                }
                channel.close()
                println("done send")
            }

            launch {
                delay(2000)
                println("recv start consume")
                channel.consumeEach {
                    println("recv $it")
                }
                println("done recv")
            }
        }
    }

I'm looking for a way to let the receiver receives the latest items, e.g.

recv 8
recv 9
recv 10

Is there a way to drop the oldest item in the channel when offer new items to the channel?

@diousk diousk changed the title How to rotate the fixed-size buffered item in Channel? How to rotate the fixed-size buffered items in Channel? Jan 4, 2020
@diousk diousk changed the title How to rotate the fixed-size buffered items in Channel? How to windowing the fixed-size buffered items in Channel? Jan 4, 2020
@diousk diousk changed the title How to windowing the fixed-size buffered items in Channel? How to window the fixed-size buffered items in Channel? Jan 4, 2020
@zach-klippenstein
Copy link
Contributor

There is no way to do this with a channel, and probably won't be. Flow might get operators to support this behavior, but I don't believe any exist currently.

@elizarov
Copy link
Contributor

There is no "out of the box" solution. I'm curious, though, as to where the need for having such behaviour might arise? How have you encountered this issue?

@diousk
Copy link
Author

diousk commented Mar 26, 2020

@elizarov My situation is just like the back pressure strategy of RxJava.
By using RxJava, I can control the buffer in the FlowableProcessor with the code like:

FlowableProcessor<Int>.onBackpressureBuffer(
    MAX_BUFFER_NUMBER, {}, BackpressureOverflowStrategy.DROP_OLDEST
).subscribe(...)

And I can not find a similar solution by using Channel.

Update: after searching around, I think the nearest implementation is like
#1798 (comment)
with the chain like dropWhileBuzy().buffer(x).
But instead, I need the fresh bufferred items and drop the oldest ones

@elizarov
Copy link
Contributor

Thanks. I know that you can do it in Rx and we are also looking at whether we need to add this kind of a feature to Kotlin flow. For that, we need to know the details. My question if where do you find the need to use it. What is the domain you are working with and what kind of events are you working with?

@elizarov elizarov added the flow label Mar 26, 2020
@diousk
Copy link
Author

diousk commented Mar 30, 2020

@elizarov It's designed for the chat room of live streaming. Sometimes there's a large number of messages sent from websocket, and we'd like the viewers to see the new messages as fresh as possible. Due to the back pressure, we need to control the message buffer size and drop some messages and here we choose to drop the oldest ones.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants