Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for await-latest ? #328

Open
ursusursus opened this issue Aug 21, 2024 · 16 comments
Open

for await-latest ? #328

ursusursus opened this issue Aug 21, 2024 · 16 comments

Comments

@ursusursus
Copy link

ursusursus commented Aug 21, 2024

Hi, we need to be able to iterate over async sequence such that if the iteration does some async stuff & upstream happens to emit in the middle of it, the current iteration is cancelled & next one is subscribed (i.e. it doesn't wait)

for item in items.latest {
   try await sleep(long time)
}

this would mirror kotlin's collectLatest (also there's flatMapLatest, transformLatest which are very powerful & necessary

@twittemb
Copy link
Contributor

Hi. I did that some time ago in this repo https://github.com/sideeffect-io/AsyncExtensions

Perhaps we could reproduce that in this Apple repo.

@ursusursus
Copy link
Author

ursusursus commented Aug 21, 2024

@ursusursus
Copy link
Author

Well that requires to return a sequence, can it be modified to act as collect(Latest)?

@EngOmarElsayed
Copy link

Can I work on this issue @twittemb

@twittemb
Copy link
Contributor

Sure

@EngOmarElsayed
Copy link

Sure

Can you explain a little bit what I need to do, I just wanted to make sure that no body is working on it beca every issue I tried to work on ended by finding PR for it 😂

@EngOmarElsayed
Copy link

@twittemb Can you give some explanation for the issue because the above comment is not clear

@twittemb
Copy link
Contributor

I think the idea was to mimic operators from reactive programming that switch to the latest upstream of events.

When you have a stream of streams every time the parent stream emits a new stream then the client switches to this latest stream and receive its events ... the previous subscription being cancelled.

@twittemb
Copy link
Contributor

With Combine it's a composition of map + switchToLatest.

@EngOmarElsayed
Copy link

@twittemb So basically the idea is to get to the latest stream without having to await for the rest. I hope I understood it correctly

@twittemb
Copy link
Contributor

As I said in the comments, I have an implementation here (https://github.com/sideeffect-io/AsyncExtensions/blob/main/Sources/Operators/AsyncSequence+FlatMapLatest.swift) you can have a look if you want.

@EngOmarElsayed
Copy link

Thanks man @twittemb , I will finish this issue soon

@ursusursus
Copy link
Author

I don't think that's the same.

I don't want to subscribe another stream, just cancel processing of the current emit and move to processing the latest.

Or is that the same you think?

Here's docs for kotlin's variant
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html

As you see the closure action: suspend (T) -> Unit it takes, is a) suspend (read async), returns unit (read Void), so not a Sequence, like in the flatMapLatest proposal you've been given

@EngOmarElsayed
Copy link

So basically the goal is to process the last stream and ignoring the rest, right ?

@ursusursus
Copy link
Author

ursusursus commented Oct 19, 2024

Not sure why you say "stream". Isn't stream always the same?

I'd say the goal is to process each "emit" but on backpressure (when upstream emits faster than downstream can process) instead of queueing (which is the default) you cancel the current and move onto the latest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants