Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make run_until_stalled handle self-waking futures #2593

Merged
merged 5 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 50 additions & 44 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,9 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
})
}

fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
f(&mut cx)
})
/// Check for a wakeup, but don't consume it.
fn woken() -> bool {
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst))
}

impl LocalPool {
Expand Down Expand Up @@ -212,20 +204,26 @@ impl LocalPool {
/// further use of one of the pool's run or poll methods.
/// Though only one task will be completed, progress may be made on multiple tasks.
pub fn try_run_one(&mut self) -> bool {
poll_executor(|ctx| {
run_executor(|cx| {
loop {
let ret = self.poll_pool_once(ctx);

// return if we have executed a future
if let Poll::Ready(Some(_)) = ret {
return true;
self.drain_incoming();

match self.pool.poll_next_unpin(cx) {
// Success!
Poll::Ready(Some(())) => return Poll::Ready(true),
// The pool was empty.
Poll::Ready(None) => return Poll::Ready(false),
Poll::Pending => (),
}

// if there are no new incoming futures
// then there is no feature that can make progress
// and we can return without having completed a single future
if self.incoming.borrow().is_empty() {
return false;
if !self.incoming.borrow().is_empty() {
// New tasks were spawned; try again.
continue;
} else if woken() {
// The pool yielded to us, but there's more progress to be made.
return Poll::Pending;
} else {
return Poll::Ready(false);
}
}
})
Expand Down Expand Up @@ -257,44 +255,52 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
run_executor(|cx| match self.poll_pool(cx) {
// The pool is empty.
Poll::Ready(()) => Poll::Ready(()),
Poll::Pending => {
if woken() {
Poll::Pending
} else {
// We're stalled for now.
Poll::Ready(())
}
}
});
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
/// Poll `self.pool`, re-filling it with any newly-spawned tasks.
/// Repeat until either the pool is empty, or it returns `Pending`.
///
/// Returns `Ready` if the pool was empty, and `Pending` otherwise.
///
/// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
/// mean that the pool can't make progress.
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// state for the FuturesUnordered, which will never be used
loop {
let ret = self.poll_pool_once(cx);
self.drain_incoming();

// we queued up some new tasks; add them and poll again
let pool_ret = self.pool.poll_next_unpin(cx);

// We queued up some new tasks; add them and poll again.
if !self.incoming.borrow().is_empty() {
continue;
}

// no queued tasks; we may be done
match ret {
Poll::Pending => return Poll::Pending,
match pool_ret {
Poll::Ready(Some(())) => continue,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
Poll::Pending => return Poll::Pending,
}
}
}

// Try make minimal progress on the pool of spawned tasks
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
// empty the incoming queue of newly-spawned tasks
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}
/// Empty the incoming queue of newly-spawned tasks.
fn drain_incoming(&mut self) {
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}

// try to execute the next ready future
self.pool.poll_next_unpin(cx)
}
}

Expand Down
64 changes: 63 additions & 1 deletion futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{self, lazy, poll_fn, Future};
use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
Expand Down Expand Up @@ -435,3 +435,65 @@ fn park_unpark_independence() {

futures::executor::block_on(future)
}

struct SelfWaking {
wakeups_remaining: Rc<RefCell<usize>>,
}

impl Future for SelfWaking {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if *self.wakeups_remaining.borrow() != 0 {
*self.wakeups_remaining.borrow_mut() -= 1;
cx.waker().wake_by_ref();
}

Poll::Pending
}
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `run_until_stalled`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_run_until_stalled() {
let wakeups_remaining = Rc::new(RefCell::new(10));

let mut pool = LocalPool::new();
let spawner = pool.spawner();
for _ in 0..3 {
let wakeups_remaining = Rc::clone(&wakeups_remaining);
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
}

// This should keep polling until there are no more wakeups.
pool.run_until_stalled();

assert_eq!(*wakeups_remaining.borrow(), 0);
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `try_run_one`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_try_run_one() {
let wakeups_remaining = Rc::new(RefCell::new(10));

let mut pool = LocalPool::new();
let spawner = pool.spawner();
for _ in 0..3 {
let wakeups_remaining = Rc::clone(&wakeups_remaining);
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
}

spawner.spawn(future::ready(())).unwrap();

// The `ready` future should complete.
assert!(pool.try_run_one());

// The self-waking futures are each polled once.
assert_eq!(*wakeups_remaining.borrow(), 7);
}