[Broadcast] implement the algorithm with a buffering strategy (v1.1) #242
+841
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR is an attempt to implement a broadcast operator following the temporary conclusion of this discussion: https://forums.swift.org/t/swift-async-algorithms-proposal-broadcast-previously-shared/61210/36
This kind of operator can become tricky to implement once you consider the back pressure management and the cancellation policies.
To ease the understanding I've used 2 levels of state machines (known as orthogonal regions in my book):
I've used the pattern: one task for the upstream sequence iteration. This task is started on the first call to
next
by the first iteration and suspended between each element.Pending questions (@FranzBusch, @phausler, @etcwilde) :
For now each client state machine is created on the first call to "next". It makes it a bit hard to test because we need some synchronisation between iterations to ensure we don't loose values and for now my tests are flaky because of that. Should we do that in the call to "makeAsyncIterator" instead ? (I remember a decision where nothing important should be done on
makeAsyncIterator
...broadcast
might be an exception to that rule)For now I use an unbounded buffer per iteration, which can lead to uncontrolled memory footprint. Should we have an "on overflow" policy to drop elements? We mentioned an optimisation of the storage in the forum based on a single buffer and keeping track of indexes per client -> it means some more computations to handle the sanitisation of these indexes, meaning poorer performances also.
For now there is no "on cancellation" policy. The task for the upstream sequence remains alive when there is no more clients, waiting for a new one. Should we allow to cancel the upstream task in that case? It will be easy to do as I keep track of the task in the state machine.
For now there is a single locking mechanism for all the state machines (the main one and the ones per client). Should we consider a locking mechanism per client to increase the perfs (by parallelising computations) ?
I've not yet implemented a "replay" mechanism, but it will be pretty easy to do thanks to the separation of concerns between state machines. All we have to do is keep track of the history and initialise each new client state machine with this history in its internal buffer.