Skip to content

Commit

Permalink
Backport to 0.1: FuturesUnordered: Do not poll the same future twice …
Browse files Browse the repository at this point in the history
…per iteration

Same as #2333. The same issue exists in 0.1, so backporting it there
helps for code that is still using Futures 0.1 in some places.
  • Loading branch information
krallin committed Feb 23, 2021
1 parent 6db6642 commit 30347de
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
37 changes: 18 additions & 19 deletions src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,6 @@ use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
const YIELD_EVERY: usize = 32;

/// An unbounded set of futures.
///
/// This "combinator" also serves a special function in this library, providing
Expand Down Expand Up @@ -291,6 +274,22 @@ impl<T> Stream for FuturesUnordered<T>
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
// A single call to `poll_next` may potentially do a lot of work before
// yielding. This happens in particular if the underlying futures are awoken
// frequently but continue to return `Pending`. This is problematic if other
// tasks are waiting on the executor, since they do not get to run. This value
// caps the number of calls to `poll` on underlying futures a single call to
// `poll_next` is allowed to make.
//
// The value is the length of FuturesUnordered. This ensures that each
// future is polled only once at most per iteration.
//
// See also https://github.com/rust-lang/futures-rs/issues/2047.
let yield_every = self.len();

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
Expand Down Expand Up @@ -353,7 +352,7 @@ impl<T> Stream for FuturesUnordered<T>
// * The future was extracted above (taken ownership). That way
// if it panics we're guaranteed that the future is
// dropped on this thread and doesn't accidentally get
// dropped on a different thread (bad).
// dropped on YIELD_EVERYa different thread (bad).
// * We unlink the node from our internal queue to preemptively
// assume it'll panic, in which case we'll want to discard it
// regardless.
Expand Down Expand Up @@ -398,7 +397,7 @@ impl<T> Stream for FuturesUnordered<T>
*node.future.get() = Some(future);
bomb.queue.link(node);

if polled == YIELD_EVERY {
if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
Expand Down
40 changes: 39 additions & 1 deletion tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ extern crate futures;
use std::any::Any;

use futures::sync::oneshot;
use futures::stream::futures_unordered;
use std::iter::FromIterator;
use futures::stream::{futures_unordered, FuturesUnordered};
use futures::prelude::*;

mod support;
Expand Down Expand Up @@ -127,3 +128,40 @@ fn iter_mut_len() {
assert_eq!(iter_mut.len(), 0);
assert!(iter_mut.next().is_none());
}

#[test]
fn polled_only_once_at_most_per_iteration() {
#[derive(Debug, Clone, Copy, Default)]
struct F {
polled: bool,
}

impl Future for F {
type Item = ();
type Error = ();

fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
if self.polled {
panic!("polled twice")
} else {
self.polled = true;
Ok(Async::NotReady)
}
}
}


let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
let tasks = futures::executor::spawn(tasks);
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert_eq!(10, tasks.get_mut().iter_mut().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
let tasks = futures::executor::spawn(tasks);
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert_eq!(33, tasks.get_mut().iter_mut().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::<F>::new();
let tasks = futures::executor::spawn(tasks);
assert!(tasks.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_ready());
}

0 comments on commit 30347de

Please sign in to comment.