Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConflatedChannel unexpected behavior #332

Closed
Khang-NT opened this issue Apr 16, 2018 · 24 comments
Closed

ConflatedChannel unexpected behavior #332

Khang-NT opened this issue Apr 16, 2018 · 24 comments

Comments

@Khang-NT
Copy link

val conflatedChannel = ConflatedChannel<Int>()
conflatedChannel.offer(1)
conflatedChannel.offer(2)
conflatedChannel.close()

conflatedChannel.receive() // expected receive 2 but error "Channel was closed"

It's very confusing, because close() method:

Conceptually, its sends a special "close token" over this channel.

Is the "close token" treated equality with an element of Channel? I think it shouldn't be, then it shouldn't replace latest element of ConflatedChannel.

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 17, 2018

It works as expected.

Close token is treated equally with any other elements and definition of conflated channel states that "the receiver always gets the most recently sent element" which is close token is case of close() call. If you have to consume (single and latest) sent element before closing, you probably need another type of channel.

Could you please elaborate on your use-case, why do you need a conflated channel with special "close" treatment? An example from a real application would be appreciated, maybe we can work on better alternative or provide better documentation

@Khang-NT
Copy link
Author

fun getResponseAsync(request: ApiRequest): ReceiveChannel<ResponseType> {
    val responseChannel = ConflatedChannel<ResponseType>()
    launch {
        val cachedResponse = getCachedResponse(request);
        if (cachedResponse != null) {
            responseChannel.send(cachedResponse)
        }
        if (cachedResponse == null || isExpired(cachedResponse)) {
            try {
                val newResponse = callApiAndCacheResponse(request)
                responseChannel.send(newResponse)
            catch(apiError: Throwable) {
                responseChannel.close(apiError)
            }
        }
        responseChannel.close()
    }
    return responseChannel
}

I use this code to make an application always show latest data in UI, the UI flow is:

--> Loading
--> Show cached data if any
--> Call api to get new data if needed
        --> Success: show new data
        --> Error: show error message
--> Stop loading

When cachedResponse isn't expired, it will call responseChannel.close() immediately, then I doesn't receive any element in responseChannel.

@fvasco
Copy link
Contributor

fvasco commented Apr 17, 2018

Hi @qwwdfsad
You miss isExpired

async {
  getCachedResponse(request)
    ?.takeUnless{ isExpired(it) }
    ?: callApiAndCacheResponse(request)
}

or simply

suspend fun getResponse(request: ApiRequest) =
  getCachedResponse(request)
    ?.takeUnless{ isExpired(it) }
    ?: callApiAndCacheResponse(request)

val response = getResponse(request)

@qwwdfsad
Copy link
Member

I've removed my answer because it's not what @Khang-NT actually wants

@Khang-NT
Copy link
Author

Khang-NT commented Apr 18, 2018

Hi all,
I need both cachedResponse and newResponse, so:

  • User will see cachedResponse while loading newResponse, call API will take time, I don't want user to see a blank space.
  • If call API error (maybe no internet connection), user still see cachedResponse, so the app has the same experience when user goes offline
  • If call API success -> show newResponse to user.

@jcornaz
Copy link
Contributor

jcornaz commented Apr 18, 2018

I aggree with @Khang-NT, there is either a problem or a missing channel implementation. Here is a shorter example :

fun main(args: Array<String>) = runBlocking {
  val source = produce(Unconfined, Channel.CONFLATED) { send(1) ; send(2) }
  println(source.receive())
}

This code will always throw ClosedReceiveChannelException, so it will never be possible to get the latest value.

I would expect to receive the latest value (2) and not an exception. As far as I know there is no other channel allowing me to do so. receive would return 1 for all other channels (rendez-vous, buffered and unlimited).

More generally the problem is that it will be rarely possible to get the latest value of a conflated channel created with produce which makes no sense IMO, because ConflatedChannel usage is for when we only care about the latest value.

@fvasco
Copy link
Contributor

fvasco commented Apr 18, 2018

suspend fun receive(): E

Retrieves and removes the element from this channel suspending the caller while this channel isEmpty or throws ClosedReceiveChannelException if the channel isClosedForReceive.

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-abstract-channel/receive.html

@jcornaz
Copy link
Contributor

jcornaz commented Apr 18, 2018

@fvasco yes it behaves correctly according to the documentation.

Sorry, I wasn't clear. I'm not saying it is a bug. I am saying it is a design problem.

Why the close token should override the latest value of a conflated channel?
This makes virtually impossible* to get the latest value of a conflated channel, because send will not actually suspend, allowing produce to close the channel right after the latest send. I think it is contradictory to the fact a conflated channel is meant to be used when we only care about the latest value.

Note: it is actually possible to get the latest value, but the receive has to be called after the last send and before the close of produce.

Actually even close documentation is misleading:

However, isClosedForReceive on the side of ReceiveChannel starts returning true only after all previously sent elements are received.

And in this cases previously sent elements are not received.

However design problem or not, how can we solve the folowing?:

@Test 
fun userShouldBeAbletoReceiveTheLastSentElement() = runBlocking {
  val source = produce(Unconfined, <use whatever channel suits the need>) { send(1) ; send(2) }
  assertThat(source.receive(), equalsTo(2))
}

@fvasco
Copy link
Contributor

fvasco commented Apr 18, 2018

Hi @jcornaz @Khang-NT
I agree with you: source cannot be an empty channel.

@jcornaz
Copy link
Contributor

jcornaz commented Jun 26, 2018

@qwwdfsad, any update on this?

I find a bit cumbersome not being able to use conflated channel when I only care about the most recent value sent by a producer.

@qwwdfsad
Copy link
Member

qwwdfsad commented Jun 26, 2018

@jcornaz nope.
Conflated channels are equivalent of Rx BehaviourSubject, which behaves exactly the same.

Could you please clarify which Rx primitive are you using for behaviour like conflated channel + don't throttle the last element?

In general, it's not clear how to implement the desired behaviour and still maintain channel contract.
Consider we implement that, then:

  1. What semantic does close carry? Channel is now always not closed for receive and new consumers can consume the last element no matter what happened.
  2. If the channel was closed with an exception, how to retrieve it? Who will log it?
  3. How to notify consumers about the closed channel? What if I want to stop consumers when the channel is closed? And usually I want to stop receivers.

Maybe it's not a good idea to call close in the first place, if you want to consume elements after closing the channel

@jcornaz
Copy link
Contributor

jcornaz commented Jun 26, 2018

To me BehaviourSubject has the exact same behavior of ConflatedBroadcastChannel. Not of ConflatedChannel. Am I not right?

And even that is a bit exaggerated. In RxJava once subscribed, the subscriber will receive all elements. This is not the case with the open ConflatedChannel returned by ConflatedBroadcastChannel.openSubscription() which may drop some values if the broadcast is faster than the consumer.

Could you please clarify which Rx primitive are you using for behaviour like conflated channel + don't throttle the last element?

I don't. I haven't said that I have a better way of doing it with RxJava. But I am not aware of an equivalent concept of ConflatedChannel in RxJava. (if there is one, please let me know, I'll have a look).

  1. What semantic does close carry? Channel is now always not closed for receive and new consumers can consume the last element no matter what happened.

It should do what is currently stated in the documentation:

Immediately after invocation of this function isClosedForSend starts returning true. However, isClosedForReceive on the side of ReceiveChannel starts returning true only after all previously sent elements are received.

which is not the case for ConlfatedChannel currently. All previously sent elements are not received.

  1. If the channel was closed with an exception, how to retrieve it? Who will log it?
  2. How to notify consumers about the closed channel? What if I want to stop consumers when the channel is closed? And usually I want to stop receivers.¨

Same as it does currently and same as it behave for any other channel. Sorry, I don't understand why it is a problem. Could you please elaborate?

Maybe it's not a good idea to call close in the first place, if you want to consume elements after closing the channel

Good point. However what if I know that from a given point of time the value will never change anymore? It is a bit sad that we cannot close the channel to tell the receivers they can finish and release their resources. (we cannot because it may cause to loose the last item which can lead to incorrect results)

@qwwdfsad
Copy link
Member

To me BehaviourSubject has the exact same behavior of ConflatedBroadcastChannel. Not of ConflatedChannel. Am I not right?

You are right. From my perspective ConflatedBroadcastChannel with one subscriber and ConflatedChannel with one receiver are indistinguishable, so the general idea is the same.

I don't. I haven't said that I have a better way of doing it with RxJava. But I am not aware of an equivalent concept of ConflatedChannel in RxJava.

I'm not aware of such concept as well. But how is this use-case resolved using Rx primitives when it naturally arises? Maybe with some combination of operators or with using special subject?
It's important to understand, so we can decide whether a conflated channel is right primitive for such use-case or we can provide something better.

Same as it does currently and same as it behave for any other channel. Sorry, I don't understand why it is a problem. Could you please elaborate?

Yes, I think my statement is misleading. These issues arise in the broadcast channel, where semantic of "element consumption" is slightly different, so let's just ignore it :)

In general, we have a conflict of conflate and regular semantics, and it's unclear which one of the compromises is "the right one".
In our internal use-cases and prior experience, current behaviour is desired one, but we should somehow provide solution for use-cases where the last element should be actually consumed, e.g. via some throttle operator. It will be done in cold streams, but it's hard to implement on top of channels.
Before making any solutions about conflated channel, I'd like to see more use-cases where this functionality is essential and its absense significantly compicates implementation.

@qwwdfsad
Copy link
Member

@Khang-NT I'm a bit confused, why are you returning channel from the method, which asynchronously loads data once.

You can change the signature of the method as getResponseAsync(request: ApiRequest): Deferred<ResponseType> and on the callsite use it as

// UI
val data = loadData(request)
if (!data.isCompleted) {
  displayStaleData()
}

display(data.await())

Does it make your application logic more understandable or simpler? Or you'd like to stick with channel solution?

@jcornaz
Copy link
Contributor

jcornaz commented Jun 29, 2018

I'd like to see more use-cases where this functionality is essential [...]

Personally I would like to use ConflatedChannel when I want the consumer to always get the latest value each time it calls receive(). In other words when I don't care about missing intermediate values and only care about the latest value.

So to me, all use-cases involving ConflatedChannel are "by-definition" interested to the latest value, even if the channel is closed. I am not able to think about a use-case where I would not want to receive the last value.

In our internal use-cases and prior experience, current behaviour is desired one, [...]

May I ask for use-case where ConflatedChannel is the good choice and where you don't care about the last value?

@elizarov
Copy link
Contributor

elizarov commented Jul 8, 2018

@jcornaz What is closing the channel use-case?

The way it was originally designed is that the channel is open while the system (or subsystem, or request, or some other activity) is working, and it keeps some number of open channels and ConflatedChannel is used if only the most recent value should be acted upon (it is a case when the values sent to the channel represent updates to some state and are sent as complete description of the state, not some delta). Now the idea behind "closing" the channel in this case is when the system (or whatever it is) is being shutdown, so there's no much point in processing "the last value", since the last value is now a shutdown signal that should be acted upon.

@jcornaz
Copy link
Contributor

jcornaz commented Jul 10, 2018

Thanks for your very clear explanation @elizarov.

Let's assume I have the following consumer:

/**
 * This consume the channel and keep something up-to-date according the states it receives.
 * This function should not have to care what channel implementation is used.
 */
suspend fun ReceiveChannel<State>.keepSomethingUpToDate() {
	consumeEach {
		updateSomething(it) // here I update something, I don't care if I miss a state, because this is not deltas.
	}
	
	// here I know the state will not change any further.
	// so I can safely release any resources and references I'm using.
	
	// but is the think I'm updating up-to-date?
	// If the channel is a rendez-vous, a buffered or a linked-list channel. Everything is fine.
	// But, only in the case of a conflated channel, I may have missed an update to perform.
	
	// So it means I have to care about the channel implementation?
	// And how do I make sure that the think I am updating is actually up-to-date before releasing my resources?
}

And now let's create a producer:

fun produceStates() = produce(capacity = Channel.CONFLATED) {
	
	// we send few states. they are not deltas, so it is fine to use a conflated channel, because any intermediate state can be safely lost.
	send(state1)
	send(state2)
	
	// At some point I may know that the state will not change any further. How do I signal this to the consumer without taking the risk to miss the latest state?
	
	delay(1, TimeUnit.SECONDS) // currently our only solution is to introduce an artificial delay (which is not good at all)
}

So the use-case of closing a channel, is simply a way to signal the consumer that the state won't change any further. But that never means I want the consumer to miss the latest state.

And all other channels finish to send their elements before actually send the close token. So why should it be different for ConflatedChannel?

Actually ConflatedChannel doesn't even fulfill what is stated in the SendChannel.close documentation:

Immediately after invocation of this function isClosedForSend starts returning true. However, isClosedForReceive on the side of ReceiveChannel starts returning true only after all previously sent elements are received.

@jcornaz
Copy link
Contributor

jcornaz commented Jul 10, 2018

To be (a little bit) more specific for the use-case of closing a conflated channel:

Any time I have a long computation which wants to share intermediate results (because they are becoming more and more precise after each step of the calculation, for example). Conflated should be a good choice, because it doesn't matter if intermediate result are lost. At any point of time the latest available result is the best one. And the computation should not have to wait for the consumer.

But, sooner or later the computation will terminate. and the latest sent element will be the actual final result. But the consumer may miss it with a ConflatedChannel.

Of course I may have a ComplatableDeferred beside in order to handle the final result. And the consumer would have to check for the final result from the deferred after receiving the close token from the channel. But what's the point of adding this additional complexity when ConflatedChannel could send the latest sent element and fulfill what is stated in the documentation of SendChannel.close?

@elizarov
Copy link
Contributor

@jcornaz Now I get it. Thanks for detailed explanation. This "intermediate values" use-case is something I had not had in mind when designing ConflatedChannel.

@elizarov
Copy link
Contributor

Now the question is how to we distinguish those two use-cases in the names of classes or configuration options that we design for those uses. To recap, we have two conflicting use-case with respect to how they shall act on closing:

  • "Data-flow computation use-case" -- process state updates from a data source as long as data source is active and shutdown when data source is closed.
  • "Intermediate results use-case" -- process intermediate results of computation as long as computation is in process and act upon the final result when computation is complete.

P.S. Spelling this out got me thinking that close concept that we currently have on channel is "closing a channel with an exception". What we are really looking here for the second use-case is to "close a channel with a value". Btw, I think it might be related to #169

@jcornaz
Copy link
Contributor

jcornaz commented Jul 18, 2018

Yes, "close a channel with a value" would fulfill the use-case.

However I still don't really understand where is the conflict between the two use cases.

What if the rule would be the following:

Immediately closing a channel isClosedForSend starts returning true. However, isClosedForReceive on the side of ReceiveChannel starts returning true only after the last sent element is received.

You will notice that:

  1. Except a tiny rewording, it is the actual documentation of SendChannel.close
  2. All other channels already fulfill this rule.

So if this rule is good for other channels in the use-case of data-flow computation. Why is it not good for ConflatedChannel?

@elizarov
Copy link
Contributor

elizarov commented Jul 18, 2018

@jcornaz If we change implementation of ConflatedChannel, then it is going to do not what it is expected to do for "data-flow computation use-case", because when you do a data-flow computation you want dependent computations to shutdown asap when the source has closed. This two use-cases conflict with respect to their requirements on close behavior, so they either need two different implementations of conflated channel or two different ways to close a channel.

@jcornaz
Copy link
Contributor

jcornaz commented Jul 18, 2018

Ok. Thanks.

I would be fine with both solution you propose.

It is just in my opinion strange that ConflatedChannel is the only channel which behave differently, even it implements the same interface and share the same documentation for SendChannel.close.

Don't you think the current documentation of close is misleading? Personally when I read it, I understand that the close token will be received by the consumer only after the elements have been received.

@elizarov
Copy link
Contributor

@jcornaz We'll need to adjust close documentation correspondingly, too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants