From 48731766d20030a64b044e7dada3b925ace8d359 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Wed, 13 May 2020 09:40:29 +0100 Subject: [PATCH] Backport to 0.1: Shared must relinquish control to the executor if repolled This backports #2136 to Futures 0.1. There isn't much to add on top of what's mentioned in #2136: the same bug exists in Futures 0.1, and it'll fail in the same way when polled in recent versions of Tokio (#2418). Backporting to 0.1 allows codebases that still have some Futures 0.1 code around to still use newer versions of Tokio. --- src/future/shared.rs | 80 ++++++++++++++++++-------------------------- tests/shared.rs | 30 +++++++++++++++++ 2 files changed, 63 insertions(+), 47 deletions(-) diff --git a/src/future/shared.rs b/src/future/shared.rs index f40c0be170..e6ec23dbe9 100644 --- a/src/future/shared.rs +++ b/src/future/shared.rs @@ -59,9 +59,8 @@ struct Notifier { const IDLE: usize = 0; const POLLING: usize = 1; -const REPOLL: usize = 2; -const COMPLETE: usize = 3; -const POISONED: usize = 4; +const COMPLETE: usize = 2; +const POISONED: usize = 3; pub fn new(future: F) -> Shared { Shared { @@ -133,7 +132,7 @@ impl Future for Shared IDLE => { // Lock acquired, fall through } - POLLING | REPOLL => { + POLLING => { // Another task is currently polling, at this point we just want // to ensure that our task handle is currently registered @@ -146,56 +145,45 @@ impl Future for Shared _ => unreachable!(), } - loop { - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a>(&'a AtomicUsize); - impl<'a> Drop for Reset<'a> { - fn drop(&mut self) { - use std::thread; + impl<'a> Drop for Reset<'a> { + fn drop(&mut self) { + use std::thread; - if thread::panicking() { - self.0.store(POISONED, SeqCst); - } + if thread::panicking() { + self.0.store(POISONED, SeqCst); } } + } - let _reset = Reset(&self.inner.notifier.state); - - // Poll the future - let res = unsafe { - (*self.inner.future.get()).as_mut().unwrap() - .poll_future_notify(&self.inner.notifier, 0) - }; - match res { - Ok(Async::NotReady) => { - // Not ready, try to release the handle - match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { - POLLING => { - // Success - return Ok(Async::NotReady); - } - REPOLL => { - // Gotta poll again! - let prev = self.inner.notifier.state.swap(POLLING, SeqCst); - assert_eq!(prev, REPOLL); - } - _ => unreachable!(), + let _reset = Reset(&self.inner.notifier.state); + + // Poll the future + let res = unsafe { + (*self.inner.future.get()).as_mut().unwrap() + .poll_future_notify(&self.inner.notifier, 0) + }; + match res { + Ok(Async::NotReady) => { + // Not ready, try to release the handle + match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) { + POLLING => { + // Success + return Ok(Async::NotReady); } - + _ => unreachable!(), } - Ok(Async::Ready(i)) => { - unsafe { - (*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) })); - } - break; + } + Ok(Async::Ready(i)) => { + unsafe { + (*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) })); } - Err(e) => { - unsafe { - (*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) })); - } - - break; + } + Err(e) => { + unsafe { + (*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) })); } } } @@ -225,8 +213,6 @@ impl Drop for Shared where F: Future { impl Notify for Notifier { fn notify(&self, _id: usize) { - self.state.compare_and_swap(POLLING, REPOLL, SeqCst); - let waiters = mem::replace(&mut *self.waiters.lock().unwrap(), HashMap::new()); for (_, waiter) in waiters { diff --git a/tests/shared.rs b/tests/shared.rs index 99d2b381ea..e465d6acc3 100644 --- a/tests/shared.rs +++ b/tests/shared.rs @@ -202,3 +202,33 @@ fn recursive_poll_with_unpark() { drop(tx0); core.run(f3).unwrap(); } + +#[test] +fn shared_future_that_wakes_itself_until_pending_is_returned() { + use futures::Async; + use std::cell::Cell; + + let core = ::support::local_executor::Core::new(); + + let proceed = Cell::new(false); + let fut = futures::future::poll_fn(|| { + Ok::<_, ()>(if proceed.get() { + Async::Ready(()) + } else { + futures::task::current().notify(); + Async::NotReady + }) + }) + .shared() + .map(|_| ()) + .map_err(|_| ()); + + // The join future can only complete if the second future gets a chance to run after the first + // has returned pending + let second = futures::future::lazy(|| { + proceed.set(true); + Ok::<_, ()>(()) + }); + + core.run(fut.join(second)).unwrap(); +}