Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered: Respect yielding from future #2551

Merged
merged 3 commits into from
Feb 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 18 additions & 35 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
Expand All @@ -31,33 +30,6 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};

/// Constant used for a `FuturesUnordered` to determine how many times it is
/// allowed to poll underlying futures without yielding.
///
/// A single call to `poll_next` may potentially do a lot of work before
/// yielding. This happens in particular if the underlying futures are awoken
/// frequently but continue to return `Pending`. This is problematic if other
/// tasks are waiting on the executor, since they do not get to run. This value
/// caps the number of calls to `poll` on underlying futures a single call to
/// `poll_next` is allowed to make.
///
/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
/// that amortize wakeup and scheduling costs, but low enough that we do not
/// starve other tasks for long.
///
/// See also https://github.com/rust-lang/futures-rs/issues/2047.
///
/// Note that using the length of the `FuturesUnordered` instead of this value
/// may cause problems if the number of futures is large.
/// See also https://github.com/rust-lang/futures-rs/pull/2527.
///
/// Additionally, polling the same future twice per iteration may cause another
/// problem. So, when using this value, it is necessary to limit the max value
/// based on the length of the `FuturesUnordered`.
/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
/// See also https://github.com/rust-lang/futures-rs/pull/2333.
const YIELD_EVERY: usize = 32;

/// A set of futures which may complete in any order.
///
/// This structure is optimized to manage a large number of futures.
Expand Down Expand Up @@ -149,6 +121,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
woken: AtomicBool::new(false),
});
let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
Expand Down Expand Up @@ -195,6 +168,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
woken: AtomicBool::new(false),
});

// Reset the `is_terminated` flag if we've previously marked ourselves
Expand Down Expand Up @@ -411,12 +385,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// See YIELD_EVERY docs for more.
let yield_every = cmp::min(self.len(), YIELD_EVERY);
let len = self.len();

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
let mut yielded = 0;

// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
Expand Down Expand Up @@ -527,7 +501,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
let task = bomb.task.as_ref().unwrap();
// We are only interested in whether the future is awoken before it
// finishes polling, so reset the flag here.
task.woken.store(false, Relaxed);
let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);

// Safety: We won't move the future ever again
Expand All @@ -540,12 +518,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
// If the future was awoken during polling, we assume
// the future wanted to explicitly yield.
yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);

if polled == yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
// If a future yields, we respect it and yield here.
// If all futures have been polled, we also yield here to
// avoid starving other tasks waiting on the executor.
// (polling the same future twice per iteration may cause
// the problem: https://github.com/rust-lang/futures-rs/pull/2333)
if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
Expand Down
9 changes: 8 additions & 1 deletion futures-util/src/stream/futures_unordered/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::sync::atomic::Ordering::{self, SeqCst};
use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};

use super::abort::abort;
Expand Down Expand Up @@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {

// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,

// Whether the future was awoken during polling
// It is possible for this flag to be set to true after the polling,
// but it will be ignored.
pub(super) woken: AtomicBool,
}

// `Task` can be sent across threads safely because it ensures that
Expand All @@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
None => return,
};

arc_self.woken.store(true, Relaxed);

// It's our job to enqueue this task it into the ready to run queue. To
// do this we set the `queued` flag, and if successful we then do the
// actual queueing operation, ensuring that we're only queued once.
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ fn polled_only_once_at_most_per_iteration() {

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
assert_eq!(33, tasks.iter().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
Expand Down