Fix Sync issues with FuturesUnordered #2054
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #2050.
This contains a couple possible fixes for
Sync
trait correctness withFuturesUnordered
, in particular with regards to simultaneousFuturesUnordered::push
andFuturesUnordered::iter
calls. These fixes are currently split up into two commits for review.The base fix replaces
head_all
andlen
updates inFuturesUnordered::link
with atomic operations to ensure the linked list state is valid ifFuturesUnordered::push
is called from multiple threads simultaneously. Since keeping the two fields in-sync would likely require locking, the fields are still updated separately, but the final results should still be correct. Unfortunately, this means that creating an iterator withFuturesUnordered::iter
cannot rely on thelen
value it reads being correct for whatever is read forhead_all
. SinceIterPinRef
implementsExactSizeIterator
, we need a valid length to avoid breaking compatibility, so the length is lazily initialized by counting the number of futures manually whenIterPinRef::size_hint
is first called, which can be costly for largeFuturesUnordered
sets.The second commit changes how
FuturesUnordered
lengths are stored to avoid the potentially high cost of manually counting futures. Instead of storing and updating a singlelen
value in theFuturesUnordered
itself, eachTask
now stores a snapshot of the list length when theTask
was inserted, avoiding the need to count list nodes or synchronize separatelen
andhead_all
reads, although at the cost of an additionalusize
perTask
.In mutable contexts, atomic operations are either performed exclusively using "relaxed" ordering semantics or are avoided altogether, as we can be sure that access to the
FuturesUnordered
list has already been synchronized and that no threads can modifyhead_all
or any fields related to that list (only theReadyToRunQueue
and its link pointers can be updated from other threads while in a mutable function).I can squash the commits or drop the second one and clean up the PR if either of these fixes are okay.