-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Broadcast algorithm #214
base: main
Are you sure you want to change the base?
Conversation
Hi @phausler Happy we are beginning to introduce this kind of operator for a future version :-). I have a few questions regarding this draft though:
Thanks. |
So it is the intent that prior values are NOT buffered for later consumption when new downstreams are attached (else the best case storage would be O(N^2)... which imho is unexpected: and if you need that then you can slap a buffer on it maybe?) This is by no means the only one of this category: I'm sure there are either composable variants or other distinct algorithms in the same "multicast" family. This is just the one I see more commonly requested/used. I think (with some extra complexity) I can modify this to respond to per element demand to pump the iterator along; that is likely tied to the consolidation of the buffers I was mentioning in the pr description. This is still a WIP so my next steps are to make that incremental demand and shared buffers. (And measure the impact/complexity of storage) |
Nice start! I agree that we should get rid of the |
static func task(_ state: ManagedCriticalState<State>, base: Base) -> Task<Void, Never> { | ||
Task { | ||
do { | ||
for try await element in base { | ||
State.emit(state, result: .success(element)) | ||
} | ||
State.emit(state, result: .success(nil)) | ||
} catch { | ||
State.emit(state, result: .failure(error)) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I just noticed after the discussion in the forums around deferred. This is actually creating multiple upstream iterators if I understand this code correctly. IMO we really need to avoid this otherwise this algorithm is not capable of transforming a unicast AsyncSequence
into a broadcasted one. @phausler please correct if I am misunderstanding the code here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this is creating 1 singular task to iterate so only 1 upstream iterator is made.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm clearly being dense, because it also looks to me like this would create a new Task
, at least each time broadcast
is called. And for each Task
there would be a separate iterator of the base sequence.
Ah, so maybe the intent is that broadcast
is only called once, and then the broadcast sequence is copied to duplicate it? That's a bit awkward.
@tcldr this might be of interest, a slightly different approach. |
@phausler Yes, this served as the inspiration for the work on The thing that begun to interest me is how this would be used with something like The other thing I realised is that while a back pressure supporting algorithm can be made to work in a ‘depressurised’ way (by chaining a buffer), the inverse isn’t possible. So I thought, maybe if I create a back pressure supporting multicast algorithm, it could be used as a primitive to support both use-cases: ones where back-pressure is a requirement and as well as those use cases where it isn’t (by chaining buffers). |
if iteration.isKnownUniquelyReferenced() { | ||
Iteration.cancel(iteration) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something here, but if the last two iterators enter deinit
at the same time (and both perform the isKnownUniquelyReferenced
check before releasing iteration
), could that trigger a race condition such that neither ends up calling Iteration.cancel
?
How does this PR relate to #242 ? |
They are both proposal how to solve this. After 1.0 of this library has shipped we need to take another look and discuss what approach we want. |
@FranzBusch almost 18 months later, any update? :) I think multicast is a crucial missing piece in Async Swift. |
@FranzBusch it's really useful feature 🙏 |
any movement on this? |
In macOS, it is common and frequent to want to use one data source in multiple windows. |
This algorithm allows for the broadcasting of values to multiple consumers from one source.
Multiple instances from the same broadcast will share the values produced from iteration. These also share failures that are thrown or non-throwing termination. Cancellation is handled on a per instance basis, but if all iterations are cancelled and there are no more references to the base iteration then the base iteration is cancelled.
There are bits of room for improvement: currently each side contains a buffer to the elements yet to be consumed. This could be changed to an index into a shared buffer, however the algorithm to share that is quite complex and could be a bit tricky to implement. So as an initial draft this is being left as
O(N * M)
storage where N is the number of active iterations andM
is the number of elements that have yet to be consumed. In degenerate cases this can be looked at asO(N^2)
storage, however in ideal cases it isO(1)
. The shared buffer can bring it down to degenerate cases beingO(N)
and the normal case beingO(1)
.