Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Replay] implement with a bounded buffer strategy (v1.1) #245

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

twittemb
Copy link
Contributor

@twittemb twittemb commented Jan 5, 2023

The idea behind this operator is to keep track of the latest elements produced by an async sequence and replay them every time a new iterator is created. To avoid uncontrolled memory footprint, the history is bounded.

For instance with a replay count of 2:

a first iterator iterates 3 times over a base:
- base produces 'a': 'a'  is stacked in the history ['a'] and the iteration receives 'a'
- base produces 'b': 'b'  is stacked in the history ['a', 'b'] and the iteration receives 'b'
- base produces 'c': 'a' is removed from the history and 'c'  is stacked in the history ['b', 'c'] and the iteration receives 'c'

a second iterator iterates 3 times over the base:
- the history is replayed with ['b', 'c'], the iteration receives 'b' and 'c'
- the base produces 'd': 'b' is removed from the history and 'd'  is stacked in the history ['c', 'd'] and the iteration receives 'd'

This kind of operator could be useful applied after a broadcast (#242) operator. It would allow to replay the elements that were produced while all iterators were not yet iterating.

@twittemb twittemb changed the title [Replay] implement with a bounded buffer strategy [Replay] implement with a bounded buffer strategy (v1.1) Jan 5, 2023
public func makeAsyncIterator() -> AsyncIterator {
return Iterator(
asyncReplaySequence: self,
base: self.base.makeAsyncIterator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can do it like this. The problem here is that if the upstream AsyncSequence is unicast then this will can potentially crash or not work.

In general, I think we need to approach the implementation a bit differently since the replay algorithm is basically a multicast algorithm. So it shares some of the same implementation characteristics as broadcast does. Specifically, I think we need to only ever create one upstream iterator. That should be done on the first call to next to any of the replay iterators. Furthermore, we need to make sure that we only issue a single next call to the upstream even if we have multiple downstreams.

Before going ahead and implementing this further, I would recommend to wait how the broadcast algorithm evolves and once we have that we should revisit replay. Let me know what you think!

@FranzBusch FranzBusch added the v1.1 Post-1.0 work label Sep 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
v1.1 Post-1.0 work
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants