Skip to content

Commit

Permalink
Make run_until_stalled handle self-waking futures (#2593)
Browse files Browse the repository at this point in the history
LocalPool::try_run_one and run_until_stalled now correctly re-try when a
future "yields" by calling wake and returning Pending.
  • Loading branch information
khollbach authored and taiki-e committed Aug 14, 2022
1 parent d11fd00 commit b7a4e59
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 45 deletions.
94 changes: 50 additions & 44 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,9 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
})
}

fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
f(&mut cx)
})
/// Check for a wakeup, but don't consume it.
fn woken() -> bool {
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst))
}

impl LocalPool {
Expand Down Expand Up @@ -212,20 +204,26 @@ impl LocalPool {
/// further use of one of the pool's run or poll methods.
/// Though only one task will be completed, progress may be made on multiple tasks.
pub fn try_run_one(&mut self) -> bool {
poll_executor(|ctx| {
run_executor(|cx| {
loop {
let ret = self.poll_pool_once(ctx);

// return if we have executed a future
if let Poll::Ready(Some(_)) = ret {
return true;
self.drain_incoming();

match self.pool.poll_next_unpin(cx) {
// Success!
Poll::Ready(Some(())) => return Poll::Ready(true),
// The pool was empty.
Poll::Ready(None) => return Poll::Ready(false),
Poll::Pending => (),
}

// if there are no new incoming futures
// then there is no feature that can make progress
// and we can return without having completed a single future
if self.incoming.borrow().is_empty() {
return false;
if !self.incoming.borrow().is_empty() {
// New tasks were spawned; try again.
continue;
} else if woken() {
// The pool yielded to us, but there's more progress to be made.
return Poll::Pending;
} else {
return Poll::Ready(false);
}
}
})
Expand Down Expand Up @@ -257,44 +255,52 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
run_executor(|cx| match self.poll_pool(cx) {
// The pool is empty.
Poll::Ready(()) => Poll::Ready(()),
Poll::Pending => {
if woken() {
Poll::Pending
} else {
// We're stalled for now.
Poll::Ready(())
}
}
});
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
/// Poll `self.pool`, re-filling it with any newly-spawned tasks.
/// Repeat until either the pool is empty, or it returns `Pending`.
///
/// Returns `Ready` if the pool was empty, and `Pending` otherwise.
///
/// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
/// mean that the pool can't make progress.
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// state for the FuturesUnordered, which will never be used
loop {
let ret = self.poll_pool_once(cx);
self.drain_incoming();

// we queued up some new tasks; add them and poll again
let pool_ret = self.pool.poll_next_unpin(cx);

// We queued up some new tasks; add them and poll again.
if !self.incoming.borrow().is_empty() {
continue;
}

// no queued tasks; we may be done
match ret {
Poll::Pending => return Poll::Pending,
match pool_ret {
Poll::Ready(Some(())) => continue,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
Poll::Pending => return Poll::Pending,
}
}
}

// Try make minimal progress on the pool of spawned tasks
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
// empty the incoming queue of newly-spawned tasks
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}
/// Empty the incoming queue of newly-spawned tasks.
fn drain_incoming(&mut self) {
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}

// try to execute the next ready future
self.pool.poll_next_unpin(cx)
}
}

Expand Down
64 changes: 63 additions & 1 deletion futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{self, lazy, poll_fn, Future};
use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
Expand Down Expand Up @@ -435,3 +435,65 @@ fn park_unpark_independence() {

futures::executor::block_on(future)
}

struct SelfWaking {
wakeups_remaining: Rc<RefCell<usize>>,
}

impl Future for SelfWaking {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if *self.wakeups_remaining.borrow() != 0 {
*self.wakeups_remaining.borrow_mut() -= 1;
cx.waker().wake_by_ref();
}

Poll::Pending
}
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `run_until_stalled`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_run_until_stalled() {
let wakeups_remaining = Rc::new(RefCell::new(10));

let mut pool = LocalPool::new();
let spawner = pool.spawner();
for _ in 0..3 {
let wakeups_remaining = Rc::clone(&wakeups_remaining);
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
}

// This should keep polling until there are no more wakeups.
pool.run_until_stalled();

assert_eq!(*wakeups_remaining.borrow(), 0);
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `try_run_one`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_try_run_one() {
let wakeups_remaining = Rc::new(RefCell::new(10));

let mut pool = LocalPool::new();
let spawner = pool.spawner();
for _ in 0..3 {
let wakeups_remaining = Rc::clone(&wakeups_remaining);
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
}

spawner.spawn(future::ready(())).unwrap();

// The `ready` future should complete.
assert!(pool.try_run_one());

// The self-waking futures are each polled once.
assert_eq!(*wakeups_remaining.borrow(), 7);
}

0 comments on commit b7a4e59

Please sign in to comment.