Skip to content

Commit

Permalink
impl Send + Sync for FuturesUnordered iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
ibraheemdev authored and taiki-e committed May 7, 2021
1 parent 4124d8b commit bbc403a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
23 changes: 14 additions & 9 deletions futures-util/src/stream/futures_unordered/iter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::FuturesUnordered;
use super::task::Task;
use super::FuturesUnordered;
use core::marker::PhantomData;
use core::pin::Pin;
use core::sync::atomic::Ordering::Relaxed;
Expand All @@ -9,25 +9,25 @@ use core::sync::atomic::Ordering::Relaxed;
pub struct IterPinMut<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>
pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
}

#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
pub struct IterMut<'a, Fut: Unpin> (pub(super) IterPinMut<'a, Fut>);
pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);

#[derive(Debug)]
/// Immutable iterator over all futures in the unordered set.
pub struct IterPinRef<'a, Fut> {
pub(super) task: *const Task<Fut>,
pub(super) len: usize,
pub(super) pending_next_all: *mut Task<Fut>,
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>
pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
}

#[derive(Debug)]
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin> (pub(super) IterPinRef<'a, Fut>);
pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);

impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;
Expand Down Expand Up @@ -85,10 +85,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
// `head_all` was initially read for this iterator implies acquire
// ordering for all previously inserted nodes (and we don't need to
// read `len_all` again for any other nodes).
let next = (*self.task).spin_next_all(
self.pending_next_all,
Relaxed,
);
let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
self.task = next;
self.len -= 1;
Some(Pin::new_unchecked(future))
Expand All @@ -115,3 +112,11 @@ impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
}

impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}

// SAFETY: we do nothing thread-local and there is no interior mutability,
// so the usual structural `Send`/`Sync` apply.
unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}

unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}
16 changes: 8 additions & 8 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1780,24 +1780,24 @@ pub mod stream {
assert_not_impl!(Zip<UnpinStream, PinnedStream>: Unpin);
assert_not_impl!(Zip<PinnedStream, UnpinStream>: Unpin);

assert_not_impl!(futures_unordered::Iter<()>: Send);
assert_not_impl!(futures_unordered::Iter<()>: Sync);
assert_impl!(futures_unordered::Iter<()>: Send);
assert_impl!(futures_unordered::Iter<()>: Sync);
assert_impl!(futures_unordered::Iter<()>: Unpin);
// futures_unordered::Iter requires `Fut: Unpin`
// assert_not_impl!(futures_unordered::Iter<PhantomPinned>: Unpin);

assert_not_impl!(futures_unordered::IterMut<()>: Send);
assert_not_impl!(futures_unordered::IterMut<()>: Sync);
assert_impl!(futures_unordered::IterMut<()>: Send);
assert_impl!(futures_unordered::IterMut<()>: Sync);
assert_impl!(futures_unordered::IterMut<()>: Unpin);
// futures_unordered::IterMut requires `Fut: Unpin`
// assert_not_impl!(futures_unordered::IterMut<PhantomPinned>: Unpin);

assert_not_impl!(futures_unordered::IterPinMut<()>: Send);
assert_not_impl!(futures_unordered::IterPinMut<()>: Sync);
assert_impl!(futures_unordered::IterPinMut<()>: Send);
assert_impl!(futures_unordered::IterPinMut<()>: Sync);
assert_impl!(futures_unordered::IterPinMut<PhantomPinned>: Unpin);

assert_not_impl!(futures_unordered::IterPinRef<()>: Send);
assert_not_impl!(futures_unordered::IterPinRef<()>: Sync);
assert_impl!(futures_unordered::IterPinRef<()>: Send);
assert_impl!(futures_unordered::IterPinRef<()>: Sync);
assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);
}

Expand Down

0 comments on commit bbc403a

Please sign in to comment.