Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add FuturesUnordered::into_iter, make iter_pin_ref public #2423

Merged
merged 4 commits into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions futures-util/src/stream/futures_unordered/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,55 @@ pub struct IterPinRef<'a, Fut> {
/// Immutable iterator over all the futures in the unordered set.
pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);

#[derive(Debug)]
/// Owned iterator over all futures in the unordered set.
pub struct IntoIter<Fut: Unpin> {
pub(super) len: usize,
pub(super) inner: FuturesUnordered<Fut>,
}

impl<Fut: Unpin> Iterator for IntoIter<Fut> {
type Item = Fut;

fn next(&mut self) -> Option<Fut> {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = self.inner.head_all.get_mut();

if (*task).is_null() {
return None;
}

unsafe {
// Moving out of the future is safe because it is `Unpin`
let future = (*(**task).future.get()).take().unwrap();

// Mutable access to a previously shared `FuturesUnordered` implies
// that the other threads already released the object before the
// current thread acquired it, so relaxed ordering can be used and
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
self.len -= 1;
Some(future)
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}

impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
type Item = Pin<&'a mut Fut>;

fn next(&mut self) -> Option<Pin<&'a mut Fut>> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_mut().unwrap();

Expand Down Expand Up @@ -78,6 +120,7 @@ impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
if self.task.is_null() {
return None;
}

unsafe {
let future = (*(*self.task).future.get()).as_ref().unwrap();

Expand Down Expand Up @@ -120,3 +163,6 @@ unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}

unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}

unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}
39 changes: 36 additions & 3 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;

mod iter;
pub use self::iter::{Iter, IterMut, IterPinMut, IterPinRef};
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};

mod task;
use self::task::Task;
Expand Down Expand Up @@ -194,10 +194,11 @@ impl<Fut> FuturesUnordered<Fut> {
}

/// Returns an iterator that allows inspecting each future in the set.
fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
pub fn iter_pin_ref(self: Pin<&Self>) -> IterPinRef<'_, Fut> {
let (task, len) = self.atomic_load_head_and_len_all();
let pending_next_all = self.pending_next_all();

IterPinRef { task, len, pending_next_all: self.pending_next_all(), _marker: PhantomData }
IterPinRef { task, len, pending_next_all, _marker: PhantomData }
}

/// Returns an iterator that allows modifying each future in the set.
Expand Down Expand Up @@ -581,6 +582,38 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a FuturesUnordered<Fut> {
type Item = &'a Fut;
type IntoIter = Iter<'a, Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}

impl<'a, Fut: Unpin> IntoIterator for &'a mut FuturesUnordered<Fut> {
type Item = &'a mut Fut;
type IntoIter = IterMut<'a, Fut>;

fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}

impl<Fut: Unpin> IntoIterator for FuturesUnordered<Fut> {
type Item = Fut;
type IntoIter = IntoIter<Fut>;

fn into_iter(mut self) -> Self::IntoIter {
// `head_all` can be accessed directly and we don't need to spin on
// `Task::next_all` since we have exclusive access to the set.
let task = *self.head_all.get_mut();
let len = if task.is_null() { 0 } else { unsafe { *(*task).len_all.get() } };

IntoIter { len, inner: self }
}
}

impl<Fut> FromIterator<Fut> for FuturesUnordered<Fut> {
fn from_iter<I>(iter: I) -> Self
where
Expand Down
7 changes: 7 additions & 0 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,13 @@ pub mod stream {
assert_impl!(futures_unordered::IterPinRef<()>: Sync);
assert_not_impl!(futures_unordered::IterPinRef<*const ()>: Sync);
assert_impl!(futures_unordered::IterPinRef<PhantomPinned>: Unpin);

assert_impl!(futures_unordered::IntoIter<()>: Send);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Send);
assert_impl!(futures_unordered::IntoIter<()>: Sync);
assert_not_impl!(futures_unordered::IntoIter<*const ()>: Sync);
// The definition of futures_unordered::IntoIter has `Fut: Unpin` bounds.
// assert_not_impl!(futures_unordered::IntoIter<PhantomPinned>: Unpin);
}

/// Assert Send/Sync/Unpin for all public types in `futures::task`.
Expand Down
45 changes: 45 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,51 @@ fn iter_len() {
assert!(iter.next().is_none());
}

#[test]
fn into_iter_cancel() {
let (a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();

let stream = stream
.into_iter()
.map(|mut rx| {
rx.close();
rx
})
.collect::<FuturesUnordered<_>>();

let mut iter = block_on_stream(stream);

assert!(a_tx.is_canceled());
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
assert_eq!(iter.next(), None);
}

#[test]
fn into_iter_len() {
let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
.into_iter()
.collect::<FuturesUnordered<_>>();

let mut into_iter = stream.into_iter();
assert_eq!(into_iter.len(), 3);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 2);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 1);
assert!(into_iter.next().is_some());
assert_eq!(into_iter.len(), 0);
assert!(into_iter.next().is_none());
}

#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
Expand Down