diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index d70d3f08550..181ab8bed32 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -29,4 +29,4 @@ tokio-futures = { version = "0.2.0", path = "../tokio-futures" } env_logger = { version = "0.5", default-features = false } # tokio = { version = "0.2.0", path = "../tokio" } tokio-test = { version = "0.2.0", path = "../tokio-test" } -loom = { version = "0.1.1", features = ["futures"] } +loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] } diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index 260d744cc5d..9413d6316d3 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -1,6 +1,8 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell, task::Waker}; +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; + +use tokio_futures::ready; use std::fmt; use std::future::Future; @@ -9,8 +11,7 @@ use std::pin::Pin; use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; -use tokio_futures::ready; +use std::task::{Context, Poll, Waker}; /// Sends a value to the associated `Receiver`. /// diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio-sync/tests/fuzz_oneshot.rs index 4ce2aefb6e2..214dad699e2 100644 --- a/tokio-sync/tests/fuzz_oneshot.rs +++ b/tokio-sync/tests/fuzz_oneshot.rs @@ -1,15 +1,16 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] #[path = "../src/oneshot.rs"] #[allow(warnings)] mod oneshot; -use futures::{self, Async, Future}; +// use futures::{self, Async, Future}; use loom; -use loom::futures::block_on; +use loom::futures::{block_on, poll_future}; use loom::thread; +use std::task::Poll::{Ready, Pending}; + #[test] fn smoke() { loom::fuzz(|| { @@ -34,16 +35,14 @@ fn changing_rx_task() { }); let rx = thread::spawn(move || { - let t1 = block_on(futures::future::poll_fn(|| Ok::<_, ()>(rx.poll().into()))).unwrap(); - - match t1 { - Ok(Async::Ready(value)) => { + match poll_future(&mut rx) { + Ready(Ok(value)) => { // ok assert_eq!(1, value); None } - Ok(Async::NotReady) => Some(rx), - Err(_) => unreachable!(), + Ready(Err(_)) => unimplemented!(), + Pending => Some(rx), } }) .join() @@ -57,6 +56,30 @@ fn changing_rx_task() { }); } +// TODO: Move this into `oneshot` proper. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct OnClose<'a> { + tx: &'a mut oneshot::Sender, +} + +impl<'a> OnClose<'a> { + fn new(tx: &'a mut oneshot::Sender) -> Self { + OnClose { tx } + } +} + +impl<'a> Future for OnClose<'a> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.get_mut().tx.poll_close(cx) + } +} + #[test] fn changing_tx_task() { loom::fuzz(|| { @@ -67,15 +90,11 @@ fn changing_tx_task() { }); let tx = thread::spawn(move || { - let t1 = block_on(futures::future::poll_fn(|| { - Ok::<_, ()>(tx.poll_close().into()) - })) - .unwrap(); + let t1 = poll_future(&mut OnClose::new(&mut tx)); match t1 { - Ok(Async::Ready(())) => None, - Ok(Async::NotReady) => Some(tx), - Err(_) => unreachable!(), + Ready(()) => None, + Pending => Some(tx), } }) .join() @@ -83,7 +102,7 @@ fn changing_tx_task() { if let Some(mut tx) = tx { // Previous task parked, use a new task... - block_on(futures::future::poll_fn(move || tx.poll_close())).unwrap(); + block_on(OnClose::new(&mut tx)); } }); }