-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove AsyncIterator: Sendable
requirement from merge
#185
Conversation
Package.swift
Outdated
@@ -16,9 +16,15 @@ let package = Package( | |||
.library(name: "_CAsyncSequenceValidationSupport", type: .static, targets: ["AsyncSequenceValidation"]), | |||
.library(name: "AsyncAlgorithms_XCTest", targets: ["AsyncAlgorithms_XCTest"]), | |||
], | |||
dependencies: [], | |||
dependencies: [ | |||
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.2"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was unsure if we should add this dependency. It makes the implementation a lot better and IMO is a lower level package. Open to discuss this :)
Switching this from Current main: 1x baseline |
I'm out on vacation this week, but a few key points - merge (like all other algorithms) should only request the next value upon need/demand, I had a similar but stale branch of collapsing the tasks to gain some perf but the problem became the collapse versus the rethrow (long/short of it was that the pseudo cancellation of a throw put a gnarly wrench in the works for the collapse part). All that being said; I think it can be done - just hadn't gotten around to it just yet, there are a number of related algorithms that are effectively a composition of merge that we could improve so this is definitely something we should dig into. |
@phausler Yeah no rush on this! Enjoy your vacation and we can talk afterwards. I just wanted to put this up for discussion. I agree that re throwing is kinda coming the way but I am seeing up to 3.5 times performance improvements with this. However, performance is no the main driver, it really is the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good, it could use some sprucing up with regards to the locks before merging (id rather not introduce a new locking scheme and keep to the managed critical state stuff if at all possible.
It took me a bit to follow all of the logic but it seems relatively sound (excluding the whole fatal error cases marked in the review).
With changing to the unchecked variants I think we can eek out some more perf!
|
||
/// Advances to the next element and returns it or `nil` if no next element exists. | ||
mutating func next() async rethrows -> Either? { | ||
if Task.isCancelled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this his sufficient; shouldn't we have a reaction to the cancellation while it is awaiting?
Consider a merge of two very long async ops: if a cancel happens we should immediately be able to discern that the resultant is nil.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the old code. I just copied this since other algos were using this and I did not want to change them.
private extension AsyncMerge2Sequence { | ||
final class Storage: @unchecked Sendable { | ||
/// The lock that protects our state. | ||
private let lock = Lock.allocate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should probably be done with the managed critical state type instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to adopt this but ManagedCriticalState
does only support using a lock with a closure. For the next
method I would like to manually lock and unlock to be able to hold the lock across the withUnsafeContinuation
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Holding a lock across a suspension is almost always a really bad idea; it can cause some pretty heinous fall out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Holding a lock across with*Continuation
is safe to do since the closure is executed right away. We have been doing this in various places of the NIO ecosystem now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To further clarify this is not holding a lock across a suspension point but just across the with*Continuation
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so there is only 1 way to do that safely (and it is a very edge case scenario).
lock()
await withUnsafeContinuation { continuation in
self.continuation = continuation
unlock()
}
It is very easy to modify that pattern into some really gnarly failure modes. From a maintainability standpoint id rather make sure there is no other way of doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right I am using exactly that pattern just with calls to the state inbetween. I agree with you that this needs to be done with care.
However, if don’t hold the lock across that call, the state handling becomes trickier since it introduces a potential interleaving between next and closure of withUnsafeContinuation where everything else can happen
} | ||
|
||
mutating func apply(_ task1: Task<Merge2StateMachine<Base1, Base2>.Partial, Never>?, _ task2: Task<Merge2StateMachine<Base1, Base2>.Partial, Never>?) async rethrows -> Either? { | ||
switch await Task.select([task1, task2].compactMap ({ $0 })).value { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think there is a way we can approach this without Task.select
? That is the other perf bottle neck (which could be either solved by not needing it, or solved by improving it at a language level)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, not my code just copied over.
/// Actions returned by `childTaskSuspended()`. | ||
enum ChildTaskSuspendedAction { | ||
/// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. | ||
case resumeContinuation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stylistic nit; most folks have wide enough monitors to see the comment, so no need to limit code to 40 col... (cause if you are editing swift on a non emulated VT100... well time to upgrade :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am using newlines here mostly to make reading it easier not because of screen width. I would prefer to keep that to maintain readability.
case let .merging(task, buffer, upstreamContinuations, upstreamsFinished, .some(downstreamContinuation)): | ||
// We produced an element and have an outstanding downstream continuation | ||
// this means we can go right ahead and resume the continuation with that element | ||
precondition(buffer.isEmpty, "We are holding a continuation so the buffer must be empty") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels like it somehow could not be fully true... do we have tests that validate this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Various tests are validating that indirectly. For this precondition to fail we would need to get into a state where we have a suspended call to next()
and things in the buffer. However, when we call next
and have things in the buffer we are returning them right away. We are only suspending if the buffer is empty. Furthermore, since we are holding the lock across the withUnsafeContinuation
call in next
nothing can get in-between us checking if the buffer is empty and us suspending.
When any of the upstreams produces a new element, it will acquire the lock and then we come into this case. If we have a continuation we are guaranteed to have an empty buffer since we would have never suspended in the first place if we had something in the buffer.
) | ||
/// Indicates that the downstream continuation should be resumed with `nil` and | ||
/// the task and the upstream continuations should be cancelled. | ||
case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some of these long names and numerous associated types kinda almost infer structness of the associations. Would it be better to change them to structures instead of effectively tuples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general I am with you; however, this would introduce just a struct for one case of an action. My personal opinion, is that this is overkill for something internal. If you this is important to you, I am happy to change it though. Your call :)
One required item for merging this: it needs to definitely handle rethrowing cases (and we should validate that is true) |
@phausler I just updated the PR again. To summarise what changed from your initial review:
I replied to some of your feedback inline but I haven't changed the following:
|
# Motivation The current implementation of `AsyncDebounceSequence` requires the base `AsyncIterator` to be `Sendable`. This is causing two problems: 1. It only allows users to use `debounce` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In `debounce` we are creating a lot of new `Task`s and reating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `debounce`. # Modification This PR overhauls the implementation of `debounce` and aligns it with the implementation of the open `merge` PR apple#185 . The most important changes are this: - I removed the `Sendable` requirement from the base sequences `AsyncIterator`. - Instead of creating new Tasks for the sleep and for the upstream consumption. I am now creating one Task and manipulate it by signalling continuations - I am not cancelling the sleep. Instead I am recalculating the time left to sleep when a sleep finishes. # Result In the end, this PR swaps the implementation of `AsyncDebounceSequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 150% speed increase in throughput.
@swift-ci please test |
d09298c
to
747c766
Compare
* Remove `AsyncIterator: Sendable` requirement from debounce # Motivation The current implementation of `AsyncDebounceSequence` requires the base `AsyncIterator` to be `Sendable`. This is causing two problems: 1. It only allows users to use `debounce` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In `debounce` we are creating a lot of new `Task`s and reating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `debounce`. # Modification This PR overhauls the implementation of `debounce` and aligns it with the implementation of the open `merge` PR #185 . The most important changes are this: - I removed the `Sendable` requirement from the base sequences `AsyncIterator`. - Instead of creating new Tasks for the sleep and for the upstream consumption. I am now creating one Task and manipulate it by signalling continuations - I am not cancelling the sleep. Instead I am recalculating the time left to sleep when a sleep finishes. # Result In the end, this PR swaps the implementation of `AsyncDebounceSequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 150% speed increase in throughput. * Fix #174 * Code review * Remove lock methods * Cleanup some unused code * Setup task after first call to next
# Motivation Currently a lot of the operator implementations in here that consume other `AsyncSequence`s require the `AsyncIterator` to be `Sendable`. This is mostly due to the fact that we are calling `makeAsyncIterator` on the upstream `AsyncSequence` and then pass that iterator around to various newly spawned `Task`s. This has two downsides: 1. It only allows users to use operators like `merge` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In merge we are creating new `Task`s for every new demand. Creating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `merge`. # Modification This PR overhauls the complete inner workings of the `AsyncMerge2Sequence`. It does a couple of things: 1. The main change is that instead of creating new `Task`s for every demand, we are creating one `Task` when the `AsyncIterator` is created. This task has as child task for every upstream sequence. 2. When calling `next` we are signalling the child tasks to demand from the upstream 3. A new state machine that is synchronizing the various concurrent operations that can happen 4. Handling cancellation since we are creating a bunch of continuations. # Result In the end, this PR swaps the implementation of `AsyncMerge2Sequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 50% speed increase in throughput. # Open points 1. I need to make this sequence re-throwing but before going down that rabbit whole I wanna get buy-in on the implementation. 2. We should discuss and document if `merge` and other operators are hot or cold, i.e. if they only request if they got downstream demand 3. I need to switch `AsyncMerge3Sequence` over to the same iplementation
@phausler Just rebased this PR. Feel free to merge it if you are happy with it! |
Motivation
Currently a lot of the operator implementations in here that consume other
AsyncSequence
s require theAsyncIterator
to beSendable
. This is mostly due to the fact that we are callingmakeAsyncIterator
on the upstreamAsyncSequence
and then pass that iterator around to various newly spawnedTask
s. This has two downsides:merge
if theirAsyncSequence.AsyncIterator
isSendable
Task
s for every new demand. CreatingTask
s is not cheap.My main goal of this PR was to remove the
Sendable
constraint frommerge
.Modification
This PR overhauls the complete inner workings of the
AsyncMerge2Sequence
. It does a couple of things:Task
s for every demand, we are creating oneTask
when theAsyncIterator
is created. This task has as child task for every upstream sequence.next
we are signalling the child tasks to demand from the upstreamResult
In the end, this PR swaps the implementation of
AsyncMerge2Sequence
and drops theSendable
constraint and passes all tests. Furthermore, on my local performance testing I saw up 50% speed increase in throughput.Open points
merge
and other operators are hot or cold, i.e. if they only request if they got downstream demandAsyncMerge3Sequence
over to the same iplementation