Skip to content

Commit

Permalink
FuturesUnordered: Limit max value of yield_every
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Dec 18, 2021
1 parent 6ffb4c5 commit fb271ed
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
45 changes: 30 additions & 15 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
Expand All @@ -30,6 +31,33 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
///
/// Note that using the length of the `FuturesUnordered` instead of this value
/// may cause problems if the number of futures is large.
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
///
/// Additionally, polling the same future twice per iteration may cause another
/// problem. So, when using this value, it is necessary to limit the max value
/// based on the length of the `FuturesUnordered`.
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
/// This structure is optimized to manage a large number of futures.
Expand Down Expand Up @@ -383,21 +411,8 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Variable to determine how many times it is allowed to poll underlying
// futures without yielding.
//
// A single call to `poll_next` may potentially do a lot of work before
// yielding. This happens in particular if the underlying futures are awoken
// frequently but continue to return `Pending`. This is problematic if other
// tasks are waiting on the executor, since they do not get to run. This value
// caps the number of calls to `poll` on underlying futures a single call to
// `poll_next` is allowed to make.
//
// The value is the length of FuturesUnordered. This ensures that each
// future is polled only once at most per iteration.
//
// See also https://github.com/rust-lang/futures-rs/issues/2047.
let yield_every = self.len();
// See YIELD_EVERY docs for more.
let yield_every = cmp::min(self.len(), YIELD_EVERY);

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ fn polled_only_once_at_most_per_iteration() {

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
Expand Down

0 comments on commit fb271ed

Please sign in to comment.