diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index d70d3f08550..aedd4c62659 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -21,12 +21,18 @@ Synchronization utilities. categories = ["asynchronous"] publish = false +[features] +async-traits = ["async-sink", "futures-core-preview"] + [dependencies] fnv = "1.0.6" -tokio-futures = { version = "0.2.0", path = "../tokio-futures" } +async-sink = { git = "https://github.com/tokio-rs/async", optional = true } +futures-core-preview = { version = "0.3.0-alpha.16", optional = true } [dev-dependencies] +async-util = { git = "https://github.com/tokio-rs/async" } env_logger = { version = "0.5", default-features = false } +pin-utils = "0.1.0-alpha.4" # tokio = { version = "0.2.0", path = "../tokio" } tokio-test = { version = "0.2.0", path = "../tokio-test" } -loom = { version = "0.1.1", features = ["futures"] } +loom = { git = "https://github.com/carllerche/loom", branch = "std-future2", features = ["futures"] } diff --git a/tokio-sync/src/lib.rs b/tokio-sync/src/lib.rs index f153c631b00..67903acecbc 100644 --- a/tokio-sync/src/lib.rs +++ b/tokio-sync/src/lib.rs @@ -20,18 +20,29 @@ macro_rules! debug { } } -/* +/// Unwrap a ready value or propagate `Poll::Pending`. +#[macro_export] +macro_rules! ready { + ($e:expr) => {{ + use std::task::Poll::{Pending, Ready}; + + match $e { + Ready(v) => v, + Pending => return Pending, + } + }}; +} + macro_rules! if_fuzz { ($($t:tt)*) => {{ if false { $($t)* } }} } -*/ -// pub mod lock; +pub mod lock; mod loom; -// pub mod mpsc; +pub mod mpsc; pub mod oneshot; -// pub mod semaphore; +pub mod semaphore; pub mod task; pub mod watch; diff --git a/tokio-sync/src/lock.rs b/tokio-sync/src/lock.rs index 3cfa1beb2f2..3b8ce36f92f 100644 --- a/tokio-sync/src/lock.rs +++ b/tokio-sync/src/lock.rs @@ -41,11 +41,13 @@ //! [`LockGuard`]: struct.LockGuard.html use crate::semaphore; -use futures::Async; + use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; use std::sync::Arc; +use std::task::Poll::Ready; +use std::task::{Context, Poll}; /// An asynchronous mutual exclusion primitive useful for protecting shared data /// @@ -103,14 +105,12 @@ impl Lock { /// Try to acquire the lock. /// /// If the lock is already held, the current task is notified when it is released. - pub fn poll_lock(&mut self) -> Async> { - if let Async::NotReady = self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| { + pub fn poll_lock(&mut self, cx: &mut Context<'_>) -> Poll> { + ready!(self.permit.poll_acquire(cx, &self.inner.s)).unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() - }) { - return Async::NotReady; - } + }); // We want to move the acquired permit into the guard, // and leave an unacquired one in self. @@ -118,7 +118,7 @@ impl Lock { inner: self.inner.clone(), permit: ::std::mem::replace(&mut self.permit, semaphore::Permit::new()), }; - Async::Ready(LockGuard(acquired)) + Ready(LockGuard(acquired)) } } diff --git a/tokio-sync/src/loom.rs b/tokio-sync/src/loom.rs index 82dde0aa87e..92a21e5ba3f 100644 --- a/tokio-sync/src/loom.rs +++ b/tokio-sync/src/loom.rs @@ -1,12 +1,10 @@ pub(crate) mod futures { - // pub(crate) use crate::task::AtomicTask; + pub(crate) use crate::task::AtomicWaker; } -pub(crate) use std::task; - pub(crate) mod sync { pub(crate) use std::sync::atomic; - // pub(crate) use std::sync::Arc; + pub(crate) use std::sync::Arc; use std::cell::UnsafeCell; @@ -33,8 +31,6 @@ pub(crate) mod sync { } } -/* pub(crate) fn yield_now() { ::std::sync::atomic::spin_loop_hint(); } -*/ diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index b2fce168be0..7f8cb90534b 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -1,6 +1,10 @@ use super::chan; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `Receiver`. /// @@ -127,6 +131,11 @@ impl Receiver { Receiver { chan } } + /// TODO: Dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -136,12 +145,12 @@ impl Receiver { } } -impl Stream for Receiver { +#[cfg(feature = "async-traits")] +impl futures_core::Stream for Receiver { type Item = T; - type Error = RecvError; - fn poll(&mut self) -> Poll, Self::Error> { - self.chan.recv().map_err(|_| RecvError(())) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Receiver::poll_next(self.get_mut(), cx) } } @@ -165,13 +174,13 @@ impl Sender { /// /// This method returns: /// - /// - `Ok(Async::Ready(_))` if capacity is reserved for a single message. - /// - `Ok(Async::NotReady)` if the channel may not have capacity, in which + /// - `Poll::Ready(Ok(_))` if capacity is reserved for a single message. + /// - `Poll::Pending` if the channel may not have capacity, in which /// case the current task is queued to be notified once /// capacity is available; - /// - `Err(SendError)` if the receiver has been dropped. - pub fn poll_ready(&mut self) -> Poll<(), SendError> { - self.chan.poll_ready().map_err(|_| SendError(())) + /// - `Poll::Ready(Err(SendError))` if the receiver has been dropped. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.poll_ready(cx).map_err(|_| SendError(())) } /// Attempts to send a message on this `Sender`, returning the message @@ -182,31 +191,29 @@ impl Sender { } } -impl Sink for Sender { - type SinkItem = T; - type SinkError = SendError; +#[cfg(feature = "async-traits")] +impl async_sink::Sink for Sender { + type Error = SendError; - fn start_send(&mut self, msg: T) -> StartSend { - use futures::Async::*; - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sender::poll_ready(self.get_mut(), cx) + } - match self.poll_ready()? { - Ready(_) => { - self.try_send(msg).map_err(|_| SendError(()))?; - Ok(AsyncSink::Ready) - } - NotReady => Ok(AsyncSink::NotReady(msg)), - } + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.as_mut() + .try_send(msg) + .map_err(|err| { + assert!(err.is_full(), "call `poll_ready` before sending"); + SendError(()) + }) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs index fae3159aac0..93e6282405a 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio-sync/src/mpsc/chan.rs @@ -1,13 +1,14 @@ use super::list; use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::atomic::AtomicUsize, sync::{Arc, CausalCell}, }; -use futures::Poll; use std::fmt; use std::process; use std::sync::atomic::Ordering::{AcqRel, Relaxed}; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; /// Channel sender pub(crate) struct Tx { @@ -61,7 +62,8 @@ pub(crate) trait Semaphore { fn add_permit(&self); - fn poll_acquire(&self, permit: &mut Self::Permit) -> Poll<(), ()>; + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Self::Permit) + -> Poll>; fn try_acquire(&self, permit: &mut Self::Permit) -> Result<(), TrySendError>; @@ -81,8 +83,8 @@ struct Chan { /// Coordinates access to channel's capacity. semaphore: S, - /// Receiver task. Notified when a value is pushed into the channel. - rx_task: AtomicTask, + /// Receiver waker. Notified when a value is pushed into the channel. + rx_waker: AtomicWaker, /// Tracks the number of outstanding sender handles. /// @@ -101,7 +103,7 @@ where fmt.debug_struct("Chan") .field("tx", &self.tx) .field("semaphore", &self.semaphore) - .field("rx_task", &self.rx_task) + .field("rx_waker", &self.rx_waker) .field("tx_count", &self.tx_count) .field("rx_fields", &"...") .finish() @@ -138,7 +140,7 @@ where let chan = Arc::new(Chan { tx, semaphore, - rx_task: AtomicTask::new(), + rx_waker: AtomicWaker::new(), tx_count: AtomicUsize::new(1), rx_fields: CausalCell::new(RxFields { list: rx, @@ -163,8 +165,8 @@ where } /// TODO: Docs - pub(crate) fn poll_ready(&mut self) -> Poll<(), ()> { - self.inner.semaphore.poll_acquire(&mut self.permit) + pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.semaphore.poll_acquire(cx, &mut self.permit) } /// Send a message and notify the receiver. @@ -177,7 +179,7 @@ where self.inner.tx.push(value); // Notify the rx task - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); // Release the permit self.inner.semaphore.forget(&mut self.permit); @@ -217,7 +219,7 @@ where self.inner.tx.close(); // Notify the receiver - self.inner.rx_task.notify(); + self.inner.rx_waker.wake(); } } @@ -246,9 +248,8 @@ where } /// Receive the next value - pub(crate) fn recv(&mut self) -> Poll, ()> { + pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; - use futures::Async::*; self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -258,7 +259,7 @@ where match rx_fields.list.pop(&self.inner.tx) { Some(Value(value)) => { self.inner.semaphore.add_permit(); - return Ok(Ready(Some(value))); + return Ready(Some(value)); } Some(Closed) => { // TODO: This check may not be required as it most @@ -268,7 +269,7 @@ where // which ensures that if dropping the tx handle is // visible, then all messages sent are also visible. assert!(self.inner.semaphore.is_idle()); - return Ok(Ready(None)); + return Ready(None); } None => {} // fall through } @@ -277,7 +278,7 @@ where try_recv!(); - self.inner.rx_task.register(); + self.inner.rx_waker.register_by_ref(cx.waker()); // It is possible that a value was pushed between attempting to read // and registering the task, so we have to check the channel a @@ -291,9 +292,9 @@ where ); if rx_fields.rx_closed && self.inner.semaphore.is_idle() { - Ok(Ready(None)) + Ready(None) } else { - Ok(NotReady) + Pending } }) } @@ -372,8 +373,8 @@ impl Semaphore for (crate::semaphore::Semaphore, usize) { self.0.available_permits() == self.1 } - fn poll_acquire(&self, permit: &mut Permit) -> Poll<(), ()> { - permit.poll_acquire(&self.0).map_err(|_| ()) + fn poll_acquire(&self, cx: &mut Context<'_>, permit: &mut Permit) -> Poll> { + permit.poll_acquire(cx, &self.0).map_err(|_| ()) } fn try_acquire(&self, permit: &mut Permit) -> Result<(), TrySendError> { @@ -415,9 +416,8 @@ impl Semaphore for AtomicUsize { self.load(Acquire) >> 1 == 0 } - fn poll_acquire(&self, permit: &mut ()) -> Poll<(), ()> { - use futures::Async::Ready; - self.try_acquire(permit).map(Ready).map_err(|_| ()) + fn poll_acquire(&self, _cx: &mut Context<'_>, permit: &mut ()) -> Poll> { + Ready(self.try_acquire(permit).map_err(|_| ())) } fn try_acquire(&self, _permit: &mut ()) -> Result<(), TrySendError> { diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 58967c915fa..960bee41650 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -1,7 +1,11 @@ use super::chan; use crate::loom::sync::atomic::AtomicUsize; -use futures::{Poll, Sink, StartSend, Stream}; + use std::fmt; +use std::task::{Context, Poll}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Send values to the associated `UnboundedReceiver`. /// @@ -83,6 +87,11 @@ impl UnboundedReceiver { UnboundedReceiver { chan } } + /// TODO: dox + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) + } + /// Closes the receiving half of a channel, without dropping it. /// /// This prevents any further messages from being sent on the channel while @@ -92,12 +101,12 @@ impl UnboundedReceiver { } } -impl Stream for UnboundedReceiver { +#[cfg(feature = "async-traits")] +impl futures_core::Stream for UnboundedReceiver { type Item = T; - type Error = UnboundedRecvError; - fn poll(&mut self) -> Poll, Self::Error> { - self.chan.recv().map_err(|_| UnboundedRecvError(())) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.chan.recv(cx) } } @@ -113,25 +122,24 @@ impl UnboundedSender { } } -impl Sink for UnboundedSender { - type SinkItem = T; - type SinkError = UnboundedSendError; +#[cfg(feature = "async-traits")] +impl async_sink::Sink for UnboundedSender { + type Error = UnboundedSendError; - fn start_send(&mut self, msg: T) -> StartSend { - use futures::AsyncSink; + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } - self.try_send(msg).map_err(|_| UnboundedSendError(()))?; - Ok(AsyncSink::Ready) + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - use futures::Async::Ready; - Ok(Ready(())) + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/oneshot.rs b/tokio-sync/src/oneshot.rs index 260d744cc5d..c38bb0cec6f 100644 --- a/tokio-sync/src/oneshot.rs +++ b/tokio-sync/src/oneshot.rs @@ -1,6 +1,6 @@ //! A channel for sending a single message between asynchronous tasks. -use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell, task::Waker}; +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; use std::fmt; use std::future::Future; @@ -9,8 +9,7 @@ use std::pin::Pin; use std::sync::atomic::Ordering::{self, AcqRel, Acquire}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; -use tokio_futures::ready; +use std::task::{Context, Poll, Waker}; /// Sends a value to the associated `Receiver`. /// diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs index 43e89c37101..33fd2318b8f 100644 --- a/tokio-sync/src/semaphore.rs +++ b/tokio-sync/src/semaphore.rs @@ -6,21 +6,23 @@ //! Before accessing the shared resource, callers acquire a permit from the //! semaphore. Once the permit is acquired, the caller then enters the critical //! section. If no permits are available, then acquiring the semaphore returns -//! `NotReady`. The task is notified once a permit becomes available. +//! `Pending`. The task is woken once a permit becomes available. use crate::loom::{ - futures::AtomicTask, + futures::AtomicWaker, sync::{ atomic::{AtomicPtr, AtomicUsize}, CausalCell, }, yield_now, }; -use futures::Poll; + use std::fmt; use std::ptr::{self, NonNull}; use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release}; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; use std::usize; /// Futures-aware semaphore. @@ -80,8 +82,8 @@ struct WaiterNode { /// See `NodeState` for more details. state: AtomicUsize, - /// Task to notify when a permit is made available. - task: AtomicTask, + /// Task to wake when a permit is made available. + waker: AtomicWaker, /// Next pointer in the queue of waiting senders. next: AtomicPtr, @@ -174,9 +176,10 @@ impl Semaphore { } /// Poll for a permit - fn poll_permit(&self, mut permit: Option<&mut Permit>) -> Poll<(), AcquireError> { - use futures::Async::*; - + fn poll_permit( + &self, + mut permit: Option<(&mut Context<'_>, &mut Permit)>, + ) -> Poll> { // Load the current state let mut curr = SemState::load(&self.state, Acquire); @@ -205,7 +208,7 @@ impl Semaphore { if curr.is_closed() { undo_strong!(); - return Err(AcquireError::closed()); + return Ready(Err(AcquireError::closed())); } if !next.acquire_permit(&self.stub) { @@ -214,13 +217,13 @@ impl Semaphore { debug_assert!(curr.waiter().is_some()); if maybe_strong.is_none() { - if let Some(ref mut permit) = permit { + if let Some((ref mut cx, ref mut permit)) = permit { // Get the Sender's waiter node, or initialize one let waiter = permit .waiter .get_or_insert_with(|| Arc::new(WaiterNode::new())); - waiter.register(); + waiter.register(cx); debug!(" + poll_permit -- to_queued_waiting"); @@ -228,14 +231,14 @@ impl Semaphore { debug!(" + poll_permit; waiter already queued"); // The node is alrady queued, there is no further work // to do. - return Ok(NotReady); + return Pending; } maybe_strong = Some(WaiterNode::into_non_null(waiter.clone())); } else { // If no `waiter`, then the task is not registered and there // is no further work to do. - return Ok(NotReady); + return Pending; } } @@ -261,14 +264,14 @@ impl Semaphore { debug!(" + poll_permit -- waiter pushed"); - return Ok(NotReady); + return Pending; } None => { debug!(" + poll_permit -- permit acquired"); undo_strong!(); - return Ok(Ready(())); + return Ready(Ok(())); } } } @@ -571,42 +574,42 @@ impl Permit { /// Try to acquire the permit. If no permits are available, the current task /// is notified once a new permit becomes available. - pub fn poll_acquire(&mut self, semaphore: &Semaphore) -> Poll<(), AcquireError> { - use futures::Async::*; - + pub fn poll_acquire( + &mut self, + cx: &mut Context<'_>, + semaphore: &Semaphore, + ) -> Poll> { match self.state { PermitState::Idle => {} PermitState::Waiting => { let waiter = self.waiter.as_ref().unwrap(); - if waiter.acquire()? { + if waiter.acquire(cx)? { self.state = PermitState::Acquired; - return Ok(Ready(())); + return Ready(Ok(())); } else { - return Ok(NotReady); + return Pending; } } PermitState::Acquired => { - return Ok(Ready(())); + return Ready(Ok(())); } } - match semaphore.poll_permit(Some(self))? { + match semaphore.poll_permit(Some((cx, self)))? { Ready(v) => { self.state = PermitState::Acquired; - Ok(Ready(v)) + Ready(Ok(v)) } - NotReady => { + Pending => { self.state = PermitState::Waiting; - Ok(NotReady) + Pending } } } /// Try to acquire the permit. pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> { - use futures::Async::*; - match self.state { PermitState::Idle => {} PermitState::Waiting => { @@ -629,7 +632,7 @@ impl Permit { self.state = PermitState::Acquired; Ok(()) } - NotReady => Err(TryAcquireError::no_permits()), + Pending => Err(TryAcquireError::no_permits()), } } @@ -748,17 +751,17 @@ impl WaiterNode { fn new() -> WaiterNode { WaiterNode { state: AtomicUsize::new(NodeState::new().to_usize()), - task: AtomicTask::new(), + waker: AtomicWaker::new(), next: AtomicPtr::new(ptr::null_mut()), } } - fn acquire(&self) -> Result { + fn acquire(&self, cx: &mut Context<'_>) -> Result { if self.acquire2()? { return Ok(true); } - self.task.register(); + self.waker.register_by_ref(cx.waker()); self.acquire2() } @@ -773,8 +776,8 @@ impl WaiterNode { } } - fn register(&self) { - self.task.register() + fn register(&self, cx: &mut Context<'_>) { + self.waker.register_by_ref(cx.waker()) } /// Returns `true` if the permit has been acquired @@ -860,7 +863,7 @@ impl WaiterNode { Ok(_) => match curr { QueuedWaiting => { debug!(" + notify -- task notified"); - self.task.notify(); + self.waker.wake(); return true; } other => { diff --git a/tokio-sync/src/task/atomic_waker.rs b/tokio-sync/src/task/atomic_waker.rs index 7228e78966e..6f741d3865c 100644 --- a/tokio-sync/src/task/atomic_waker.rs +++ b/tokio-sync/src/task/atomic_waker.rs @@ -1,6 +1,8 @@ -use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell, task::Waker}; +use crate::loom::{sync::atomic::AtomicUsize, sync::CausalCell}; + use std::fmt; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::task::Waker; /// A synchronization primitive for task waking. /// diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index d90fdd9f299..ce275b4e7cf 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -55,7 +55,6 @@ use crate::task::AtomicWaker; -use core::pin::Pin; use core::task::Poll::{Pending, Ready}; use core::task::{Context, Poll}; use fnv::FnvHashMap; @@ -63,7 +62,9 @@ use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; -use tokio_futures::{ready, Sink, Stream}; + +#[cfg(feature = "async-traits")] +use std::pin::Pin; /// Receives values from the associated `Sender`. /// @@ -239,7 +240,7 @@ impl Receiver { /// Attempts to receive the latest value sent via the channel. /// /// If a new, unobserved, value has been sent, a reference to it is - /// returned. If no new value has been sent, then `NotReady` is returned and + /// returned. If no new value has been sent, then `Pending` is returned and /// the current task is notified once a new value is sent. /// /// Only the **most recent** value is returned. If the receiver is falling @@ -269,7 +270,18 @@ impl Receiver { } } -impl Stream for Receiver { +impl Receiver { + /// Attempts to clone the latest value sent via the channel. + /// + /// This is equivalent to calling `Clone` on the value returned by `poll_ref`. + pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + let item = ready!(self.poll_ref(cx)); + Ready(item.map(|v_ref| v_ref.clone())) + } +} + +#[cfg(feature = "async-traits")] +impl futures_core::Stream for Receiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -359,7 +371,8 @@ impl Sender { } } -impl Sink for Sender { +#[cfg(feature = "async-traits")] +impl async_sink::Sink for Sender { type Error = error::SendError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { diff --git a/tokio-sync/tests/atomic_task.rs b/tokio-sync/tests/atomic_task.rs deleted file mode 100644 index 90bc2733e35..00000000000 --- a/tokio-sync/tests/atomic_task.rs +++ /dev/null @@ -1,53 +0,0 @@ -#![cfg(feature = "broken")] -#![deny(warnings, rust_2018_idioms)] - -use futures::task::{self, Task}; -use tokio_mock_task::*; -use tokio_sync::task::AtomicTask; - -trait AssertSend: Send {} -trait AssertSync: Send {} - -impl AssertSend for AtomicTask {} -impl AssertSync for AtomicTask {} - -impl AssertSend for Task {} -impl AssertSync for Task {} - -#[test] -fn register_task() { - // AtomicTask::register_task should *always* register the - // arbitrary task. - - let atomic = AtomicTask::new(); - - let mut mock1 = MockTask::new(); - let mut mock2 = MockTask::new(); - - // Register once... - mock1.enter(|| atomic.register()); - - // Grab the actual 2nd task from the mock... - let task2 = mock2.enter(task::current); - - // Now register the 2nd task, even though in the context where - // the first task would be considered 'current'... - { - // Need a block to grab a reference, so that we only move - // task2 into the closure, not the AtomicTask... - let atomic = &atomic; - mock1.enter(move || { - atomic.register_task(task2); - }); - } - - // Just proving that they haven't been notified yet... - assert!(!mock1.is_notified(), "mock1 shouldn't be notified yet"); - assert!(!mock2.is_notified(), "mock2 shouldn't be notified yet"); - - // Now trigger the notify, and ensure it was task2 - atomic.notify(); - - assert!(!mock1.is_notified(), "mock1 shouldn't be notified"); - assert!(mock2.is_notified(), "mock2 should be notified"); -} diff --git a/tokio-sync/tests/atomic_waker.rs b/tokio-sync/tests/atomic_waker.rs new file mode 100644 index 00000000000..28a9f40ddcf --- /dev/null +++ b/tokio-sync/tests/atomic_waker.rs @@ -0,0 +1,37 @@ +#![deny(warnings, rust_2018_idioms)] + +use std::task::Waker; +use tokio_sync::task::AtomicWaker; +use tokio_test::task::MockTask; + +trait AssertSend: Send {} +trait AssertSync: Send {} + +impl AssertSend for AtomicWaker {} +impl AssertSync for AtomicWaker {} + +impl AssertSend for Waker {} +impl AssertSync for Waker {} + +#[test] +fn basic_usage() { + let waker = AtomicWaker::new(); + let mut task = MockTask::new(); + + task.enter(|cx| waker.register_by_ref(cx.waker())); + waker.wake(); + + assert!(task.is_woken()); +} + +#[test] +fn wake_without_register() { + let waker = AtomicWaker::new(); + waker.wake(); + + // Registering should not result in a notification + let mut task = MockTask::new(); + task.enter(|cx| waker.register_by_ref(cx.waker())); + + assert!(!task.is_woken()); +} diff --git a/tokio-sync/tests/errors.rs b/tokio-sync/tests/errors.rs index fd9effae9c4..2afafc1f6d8 100644 --- a/tokio-sync/tests/errors.rs +++ b/tokio-sync/tests/errors.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] fn is_error() {} @@ -7,7 +6,6 @@ fn is_error() {} fn mpsc_error_bound() { use tokio_sync::mpsc::error; - is_error::(); is_error::(); is_error::>(); is_error::(); @@ -27,6 +25,5 @@ fn oneshot_error_bound() { fn watch_error_bound() { use tokio_sync::watch::error; - is_error::(); is_error::>(); } diff --git a/tokio-sync/tests/fuzz_atomic_task.rs b/tokio-sync/tests/fuzz_atomic_waker.rs similarity index 66% rename from tokio-sync/tests/fuzz_atomic_task.rs rename to tokio-sync/tests/fuzz_atomic_waker.rs index fc102f9372e..1a2ef1cc569 100644 --- a/tokio-sync/tests/fuzz_atomic_task.rs +++ b/tokio-sync/tests/fuzz_atomic_waker.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] #[macro_use] @@ -7,19 +6,19 @@ extern crate loom; #[allow(dead_code)] #[path = "../src/task/atomic_waker.rs"] mod atomic_waker; +use crate::atomic_waker::AtomicWaker; -use crate::atomic_task::AtomicTask; -use futures::future::poll_fn; -use futures::Async; +use async_util::future::poll_fn; use loom::futures::block_on; use loom::sync::atomic::AtomicUsize; use loom::thread; use std::sync::atomic::Ordering::Relaxed; use std::sync::Arc; +use std::task::Poll::{Pending, Ready}; struct Chan { num: AtomicUsize, - task: AtomicTask, + task: AtomicWaker, } #[test] @@ -29,7 +28,7 @@ fn basic_notification() { loom::fuzz(|| { let chan = Arc::new(Chan { num: AtomicUsize::new(0), - task: AtomicTask::new(), + task: AtomicWaker::new(), }); for _ in 0..NUM_NOTIFY { @@ -37,19 +36,18 @@ fn basic_notification() { thread::spawn(move || { chan.num.fetch_add(1, Relaxed); - chan.task.notify(); + chan.task.wake(); }); } - block_on(poll_fn(move || { - chan.task.register(); + block_on(poll_fn(move |cx| { + chan.task.register_by_ref(cx.waker()); if NUM_NOTIFY == chan.num.load(Relaxed) { - return Ok(Async::Ready(())); + return Ready(()); } - Ok::<_, ()>(Async::NotReady) - })) - .unwrap(); + Pending + })); }); } diff --git a/tokio-sync/tests/fuzz_list.rs b/tokio-sync/tests/fuzz_list.rs index f180f8676ac..4edd21d7f55 100644 --- a/tokio-sync/tests/fuzz_list.rs +++ b/tokio-sync/tests/fuzz_list.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] #[macro_use] diff --git a/tokio-sync/tests/fuzz_mpsc.rs b/tokio-sync/tests/fuzz_mpsc.rs index c90e214f879..4a0b6441d4f 100644 --- a/tokio-sync/tests/fuzz_mpsc.rs +++ b/tokio-sync/tests/fuzz_mpsc.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] #[macro_use] @@ -18,7 +17,8 @@ mod mpsc; #[allow(warnings)] mod semaphore; -use futures::{future::poll_fn, Stream}; +// use futures::{future::poll_fn, Stream}; +use async_util::future::poll_fn; use loom::futures::block_on; use loom::thread; @@ -32,10 +32,10 @@ fn closing_tx() { drop(tx); }); - let v = block_on(poll_fn(|| rx.poll())).unwrap(); + let v = block_on(poll_fn(|cx| rx.poll_next(cx))); assert!(v.is_some()); - let v = block_on(poll_fn(|| rx.poll())).unwrap(); + let v = block_on(poll_fn(|cx| rx.poll_next(cx))); assert!(v.is_none()); }); } diff --git a/tokio-sync/tests/fuzz_oneshot.rs b/tokio-sync/tests/fuzz_oneshot.rs index 4ce2aefb6e2..9e0a1c40a2b 100644 --- a/tokio-sync/tests/fuzz_oneshot.rs +++ b/tokio-sync/tests/fuzz_oneshot.rs @@ -1,15 +1,29 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] +/// Unwrap a ready value or propagate `Async::Pending`. +#[macro_export] +macro_rules! ready { + ($e:expr) => {{ + use std::task::Poll::{Pending, Ready}; + + match $e { + Ready(v) => v, + Pending => return Pending, + } + }}; +} + #[path = "../src/oneshot.rs"] #[allow(warnings)] mod oneshot; -use futures::{self, Async, Future}; +// use futures::{self, Async, Future}; use loom; -use loom::futures::block_on; +use loom::futures::{block_on, poll_future}; use loom::thread; +use std::task::Poll::{Pending, Ready}; + #[test] fn smoke() { loom::fuzz(|| { @@ -34,16 +48,14 @@ fn changing_rx_task() { }); let rx = thread::spawn(move || { - let t1 = block_on(futures::future::poll_fn(|| Ok::<_, ()>(rx.poll().into()))).unwrap(); - - match t1 { - Ok(Async::Ready(value)) => { + match poll_future(&mut rx) { + Ready(Ok(value)) => { // ok assert_eq!(1, value); None } - Ok(Async::NotReady) => Some(rx), - Err(_) => unreachable!(), + Ready(Err(_)) => unimplemented!(), + Pending => Some(rx), } }) .join() @@ -57,6 +69,30 @@ fn changing_rx_task() { }); } +// TODO: Move this into `oneshot` proper. + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct OnClose<'a> { + tx: &'a mut oneshot::Sender, +} + +impl<'a> OnClose<'a> { + fn new(tx: &'a mut oneshot::Sender) -> Self { + OnClose { tx } + } +} + +impl<'a> Future for OnClose<'a> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.get_mut().tx.poll_close(cx) + } +} + #[test] fn changing_tx_task() { loom::fuzz(|| { @@ -67,15 +103,11 @@ fn changing_tx_task() { }); let tx = thread::spawn(move || { - let t1 = block_on(futures::future::poll_fn(|| { - Ok::<_, ()>(tx.poll_close().into()) - })) - .unwrap(); + let t1 = poll_future(&mut OnClose::new(&mut tx)); match t1 { - Ok(Async::Ready(())) => None, - Ok(Async::NotReady) => Some(tx), - Err(_) => unreachable!(), + Ready(()) => None, + Pending => Some(tx), } }) .join() @@ -83,7 +115,7 @@ fn changing_tx_task() { if let Some(mut tx) = tx { // Previous task parked, use a new task... - block_on(futures::future::poll_fn(move || tx.poll_close())).unwrap(); + block_on(OnClose::new(&mut tx)); } }); } diff --git a/tokio-sync/tests/fuzz_semaphore.rs b/tokio-sync/tests/fuzz_semaphore.rs index 3c60caa8265..ca2fda8b99d 100644 --- a/tokio-sync/tests/fuzz_semaphore.rs +++ b/tokio-sync/tests/fuzz_semaphore.rs @@ -1,4 +1,3 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] #[macro_use] @@ -9,12 +8,30 @@ extern crate loom; mod semaphore; use crate::semaphore::*; -use futures::{future, try_ready, Async, Future, Poll}; + +use async_util::future::poll_fn; use loom::futures::block_on; use loom::thread; +use std::future::Future; +use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::Arc; +use std::task::Poll::Ready; +use std::task::{Context, Poll}; + +/// Unwrap a ready value or propagate `Poll::Pending`. +#[macro_export] +macro_rules! ready { + ($e:expr) => {{ + use std::task::Poll::{Pending, Ready}; + + match $e { + Ready(v) => v, + Pending => return Pending, + } + }}; +} #[test] fn basic_usage() { @@ -31,24 +48,22 @@ fn basic_usage() { } impl Future for Actor { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { - try_ready!(self - .waiter - .poll_acquire(&self.shared.semaphore) - .map_err(|_| ())); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + let me = &mut *self; - let actual = self.shared.active.fetch_add(1, SeqCst); + ready!(me.waiter.poll_acquire(cx, &me.shared.semaphore)).unwrap(); + + let actual = me.shared.active.fetch_add(1, SeqCst); assert!(actual <= NUM - 1); - let actual = self.shared.active.fetch_sub(1, SeqCst); + let actual = me.shared.active.fetch_sub(1, SeqCst); assert!(actual <= NUM); - self.waiter.release(&self.shared.semaphore); + me.waiter.release(&me.shared.semaphore); - Ok(Async::Ready(())) + Ready(()) } } @@ -65,16 +80,14 @@ fn basic_usage() { block_on(Actor { waiter: Permit::new(), shared, - }) - .unwrap(); + }); }); } block_on(Actor { waiter: Permit::new(), shared, - }) - .unwrap(); + }); }); } @@ -88,11 +101,7 @@ fn release() { thread::spawn(move || { let mut permit = Permit::new(); - block_on(future::lazy(|| { - permit.poll_acquire(&semaphore).unwrap(); - Ok::<_, ()>(()) - })) - .unwrap(); + block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); permit.release(&semaphore); }); @@ -100,7 +109,7 @@ fn release() { let mut permit = Permit::new(); - block_on(future::poll_fn(|| permit.poll_acquire(&semaphore))).unwrap(); + block_on(poll_fn(|cx| permit.poll_acquire(cx, &semaphore))).unwrap(); permit.release(&semaphore); }); @@ -120,9 +129,10 @@ fn basic_closing() { let mut permit = Permit::new(); for _ in 0..2 { - block_on(future::poll_fn(|| { - permit.poll_acquire(&semaphore).map_err(|_| ()) + block_on(poll_fn(|cx| { + permit.poll_acquire(cx, &semaphore).map_err(|_| ()) }))?; + permit.release(&semaphore); } @@ -147,8 +157,8 @@ fn concurrent_close() { thread::spawn(move || { let mut permit = Permit::new(); - block_on(future::poll_fn(|| { - permit.poll_acquire(&semaphore).map_err(|_| ()) + block_on(poll_fn(|cx| { + permit.poll_acquire(cx, &semaphore).map_err(|_| ()) }))?; permit.release(&semaphore); diff --git a/tokio-sync/tests/lock.rs b/tokio-sync/tests/lock.rs index 19677576a25..33cebef53d9 100644 --- a/tokio-sync/tests/lock.rs +++ b/tokio-sync/tests/lock.rs @@ -1,66 +1,50 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] -use futures; -use tokio_mock_task::*; use tokio_sync::lock::Lock; - -macro_rules! assert_ready { - ($e:expr) => {{ - match $e { - futures::Async::Ready(v) => v, - futures::Async::NotReady => panic!("not ready"), - } - }}; -} - -macro_rules! assert_not_ready { - ($e:expr) => {{ - match $e { - futures::Async::NotReady => {} - futures::Async::Ready(v) => panic!("ready; value = {:?}", v), - } - }}; -} +use tokio_test::task::MockTask; +use tokio_test::{assert_pending, assert_ready}; #[test] fn straight_execution() { + let mut task = MockTask::new(); let mut l = Lock::new(100); // We can immediately acquire the lock and take the value - let mut g = assert_ready!(l.poll_lock()); - assert_eq!(&*g, &100); - *g = 99; - drop(g); - - let mut g = assert_ready!(l.poll_lock()); - assert_eq!(&*g, &99); - *g = 98; - drop(g); - - let mut g = assert_ready!(l.poll_lock()); - assert_eq!(&*g, &98); - - // We can continue to access the guard even if the lock is dropped - drop(l); - *g = 97; - assert_eq!(&*g, &97); + task.enter(|cx| { + let mut g = assert_ready!(l.poll_lock(cx)); + assert_eq!(&*g, &100); + *g = 99; + drop(g); + + let mut g = assert_ready!(l.poll_lock(cx)); + assert_eq!(&*g, &99); + *g = 98; + drop(g); + + let mut g = assert_ready!(l.poll_lock(cx)); + assert_eq!(&*g, &98); + + // We can continue to access the guard even if the lock is dropped + drop(l); + *g = 97; + assert_eq!(&*g, &97); + }); } #[test] fn readiness() { - let mut task = MockTask::new(); + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); let mut l = Lock::new(100); - let g = assert_ready!(l.poll_lock()); + + let g = assert_ready!(t1.enter(|cx| l.poll_lock(cx))); // We can't now acquire the lease since it's already held in g - task.enter(|| { - assert_not_ready!(l.poll_lock()); - }); + assert_pending!(t2.enter(|cx| l.poll_lock(cx))); // But once g unlocks, we can acquire it drop(g); - assert!(task.is_notified()); - assert_ready!(l.poll_lock()); + assert!(t2.is_woken()); + assert_ready!(t2.enter(|cx| l.poll_lock(cx))); } diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index 66a8dda8b0e..17dd3e64814 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -1,99 +1,111 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] -use futures; -use futures::prelude::*; -use std::sync::Arc; -use std::thread; -use tokio_mock_task::*; use tokio_sync::mpsc; +use tokio_test::task::MockTask; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; + +use std::sync::Arc; trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} impl AssertSend for mpsc::Receiver {} -macro_rules! assert_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::Ready(v)) => v, - Ok(_) => panic!("not ready"), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - -macro_rules! assert_not_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::NotReady) => {} - Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - #[test] fn send_recv_with_buffer() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let (mut tx, mut rx) = mpsc::channel::(16); // Using poll_ready / try_send - assert_ready!(tx.poll_ready()); + assert_ready_ok!(t1.enter(|cx| tx.poll_ready(cx))); tx.try_send(1).unwrap(); // Without poll_ready tx.try_send(2).unwrap(); - // Sink API - assert!(tx.start_send(3).unwrap().is_ready()); - assert_ready!(tx.poll_complete()); - assert_ready!(tx.close()); - drop(tx); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(2)); - let val = assert_ready!(rx.poll()); - assert_eq!(val, Some(3)); - - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t2.enter(|cx| rx.poll_next(cx))); assert!(val.is_none()); } +#[test] +#[cfg(feature = "async-traits")] +fn send_sink_recv_with_buffer() { + use async_sink::Sink; + use futures_core::Stream; + use pin_utils::pin_mut; + + let mut t1 = MockTask::new(); + + let (tx, rx) = mpsc::channel::(16); + + t1.enter(|cx| { + pin_mut!(tx); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(1)); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(2)); + + assert_ready_ok!(tx.as_mut().poll_flush(cx)); + assert_ready_ok!(tx.as_mut().poll_close(cx)); + }); + + t1.enter(|cx| { + pin_mut!(rx); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(1)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(2)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert!(val.is_none()); + }); +} + #[test] fn start_send_past_cap() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let mut t3 = MockTask::new(); + let (mut tx1, mut rx) = mpsc::channel(1); let mut tx2 = tx1.clone(); - let mut task1 = MockTask::new(); - let mut task2 = MockTask::new(); + assert_ok!(tx1.try_send(())); - let res = tx1.start_send(()).unwrap(); - assert!(res.is_ready()); - - task1.enter(|| { - let res = tx1.start_send(()).unwrap(); - assert!(!res.is_ready()); + t1.enter(|cx| { + assert_pending!(tx1.poll_ready(cx)); }); - task2.enter(|| { - assert_not_ready!(tx2.poll_ready()); + t2.enter(|cx| { + assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); - let val = assert_ready!(rx.poll()); + let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx))); assert!(val.is_some()); - assert!(task2.is_notified()); - assert!(!task1.is_notified()); + assert!(t2.is_woken()); + assert!(!t1.is_woken()); drop(tx2); - let val = assert_ready!(rx.poll()); + let val = t3.enter(|cx| assert_ready!(rx.poll_next(cx))); assert!(val.is_none()); } @@ -105,33 +117,69 @@ fn buffer_gteq_one() { #[test] fn send_recv_unbounded() { + let mut t1 = MockTask::new(); + let (mut tx, mut rx) = mpsc::unbounded_channel::(); // Using `try_send` - tx.try_send(1).unwrap(); + assert_ok!(tx.try_send(1)); + assert_ok!(tx.try_send(2)); - // Using `Sink` API - assert!(tx.start_send(2).unwrap().is_ready()); - assert_ready!(tx.poll_complete()); - - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(1)); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); assert_eq!(val, Some(2)); - assert_ready!(tx.poll_complete()); - assert_ready!(tx.close()); - drop(tx); - let val = assert_ready!(rx.poll()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); assert!(val.is_none()); } +#[test] +#[cfg(feature = "async-traits")] +fn sink_send_recv_unbounded() { + use async_sink::Sink; + use futures_core::Stream; + use pin_utils::pin_mut; + + let mut t1 = MockTask::new(); + + let (tx, rx) = mpsc::unbounded_channel::(); + + t1.enter(|cx| { + pin_mut!(tx); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(1)); + + assert_ready_ok!(tx.as_mut().poll_ready(cx)); + assert_ok!(tx.as_mut().start_send(2)); + + assert_ready_ok!(tx.as_mut().poll_flush(cx)); + assert_ready_ok!(tx.as_mut().poll_close(cx)); + }); + + t1.enter(|cx| { + pin_mut!(rx); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(1)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert_eq!(val, Some(2)); + + let val = assert_ready!(Stream::poll_next(rx.as_mut(), cx)); + assert!(val.is_none()); + }); +} + #[test] fn no_t_bounds_buffer() { struct NoImpls; + + let mut t1 = MockTask::new(); let (tx, mut rx) = mpsc::channel(100); // sender should be Debug even though T isn't Debug @@ -140,12 +188,16 @@ fn no_t_bounds_buffer() { println!("{:?}", rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - assert!(assert_ready!(rx.poll()).is_some()); + + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert!(val.is_some()); } #[test] fn no_t_bounds_unbounded() { struct NoImpls; + + let mut t1 = MockTask::new(); let (tx, mut rx) = mpsc::unbounded_channel(); // sender should be Debug even though T isn't Debug @@ -154,186 +206,188 @@ fn no_t_bounds_unbounded() { println!("{:?}", rx); // and sender should be Clone even though T isn't Clone assert!(tx.clone().try_send(NoImpls).is_ok()); - assert!(assert_ready!(rx.poll()).is_some()); + + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert!(val.is_some()); } #[test] fn send_recv_buffer_limited() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let (mut tx, mut rx) = mpsc::channel::(1); - let mut task = MockTask::new(); // Run on a task context - task.enter(|| { - assert!(tx.poll_complete().unwrap().is_ready()); - assert!(tx.poll_ready().unwrap().is_ready()); + t1.enter(|cx| { + assert_ready_ok!(tx.poll_ready(cx)); // Send first message - let res = tx.start_send(1).unwrap(); - assert!(is_ready(&res)); - assert!(tx.poll_ready().unwrap().is_not_ready()); + assert_ok!(tx.try_send(1)); - // Send second message - let res = tx.start_send(2).unwrap(); - assert!(!is_ready(&res)); - - // Take the value - assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1))); - assert!(tx.poll_ready().unwrap().is_ready()); + // Not ready + assert_pending!(tx.poll_ready(cx)); - let res = tx.start_send(2).unwrap(); - assert!(is_ready(&res)); - assert!(tx.poll_ready().unwrap().is_not_ready()); + // Send second message + assert_err!(tx.try_send(1337)); + }); + t2.enter(|cx| { // Take the value - assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2))); - assert!(tx.poll_ready().unwrap().is_ready()); + let val = assert_ready!(rx.poll_next(cx)); + assert_eq!(Some(1), val); }); -} -#[test] -fn send_shared_recv() { - let (tx1, rx) = mpsc::channel::(16); - let tx2 = tx1.clone(); - let mut rx = rx.wait(); + assert!(t1.is_woken()); - tx1.send(1).wait().unwrap(); - assert_eq!(rx.next().unwrap().unwrap(), 1); + t1.enter(|cx| { + assert_ready_ok!(tx.poll_ready(cx)); - tx2.send(2).wait().unwrap(); - assert_eq!(rx.next().unwrap().unwrap(), 2); -} + assert_ok!(tx.try_send(2)); -#[test] -fn send_recv_threads() { - let (tx, rx) = mpsc::channel::(16); - let mut rx = rx.wait(); + // Not ready + assert_pending!(tx.poll_ready(cx)); + }); - thread::spawn(move || { - tx.send(1).wait().unwrap(); + t2.enter(|cx| { + // Take the value + let val = assert_ready!(rx.poll_next(cx)); + assert_eq!(Some(2), val); }); - assert_eq!(rx.next().unwrap().unwrap(), 1); + t1.enter(|cx| { + assert_ready_ok!(tx.poll_ready(cx)); + }); } #[test] fn recv_close_gets_none_idle() { + let mut t1 = MockTask::new(); + let (mut tx, mut rx) = mpsc::channel::(10); - let mut task = MockTask::new(); rx.close(); - task.enter(|| { - let val = assert_ready!(rx.poll()); + t1.enter(|cx| { + let val = assert_ready!(rx.poll_next(cx)); assert!(val.is_none()); - assert!(tx.poll_ready().is_err()); + assert_ready_err!(tx.poll_ready(cx)); }); } #[test] fn recv_close_gets_none_reserved() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let mut t3 = MockTask::new(); + let (mut tx1, mut rx) = mpsc::channel::(1); let mut tx2 = tx1.clone(); - assert_ready!(tx1.poll_ready()); - - let mut task = MockTask::new(); + assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx))); - task.enter(|| { - assert_not_ready!(tx2.poll_ready()); + t2.enter(|cx| { + assert_pending!(tx2.poll_ready(cx)); }); rx.close(); - assert!(task.is_notified()); + assert!(t2.is_woken()); - task.enter(|| { - assert!(tx2.poll_ready().is_err()); - assert_not_ready!(rx.poll()); + t2.enter(|cx| { + assert_ready_err!(tx2.poll_ready(cx)); }); - assert!(!task.is_notified()); + t3.enter(|cx| assert_pending!(rx.poll_next(cx))); + + assert!(!t1.is_woken()); + assert!(!t2.is_woken()); - assert!(tx1.try_send(123).is_ok()); + assert_ok!(tx1.try_send(123)); - assert!(task.is_notified()); + assert!(t3.is_woken()); - task.enter(|| { - let v = assert_ready!(rx.poll()); + t3.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)); assert_eq!(v, Some(123)); - let v = assert_ready!(rx.poll()); + let v = assert_ready!(rx.poll_next(cx)); assert!(v.is_none()); }); } #[test] fn tx_close_gets_none() { + let mut t1 = MockTask::new(); + let (_, mut rx) = mpsc::channel::(10); - let mut task = MockTask::new(); // Run on a task context - task.enter(|| { - let v = assert_ready!(rx.poll()); + t1.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)); assert!(v.is_none()); }); } -fn is_ready(res: &AsyncSink) -> bool { - match *res { - AsyncSink::Ready => true, - _ => false, - } -} - #[test] fn try_send_fail() { - let (mut tx, rx) = mpsc::channel(1); - let mut rx = rx.wait(); + let mut t1 = MockTask::new(); + + let (mut tx, mut rx) = mpsc::channel(1); tx.try_send("hello").unwrap(); // This should fail - assert!(tx.try_send("fail").unwrap_err().is_full()); + let err = assert_err!(tx.try_send("fail")); + assert!(err.is_full()); - assert_eq!(rx.next().unwrap().unwrap(), "hello"); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert_eq!(val, Some("hello")); - tx.try_send("goodbye").unwrap(); + assert_ok!(tx.try_send("goodbye")); drop(tx); - assert_eq!(rx.next().unwrap().unwrap(), "goodbye"); - assert!(rx.next().is_none()); + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert_eq!(val, Some("goodbye")); + + let val = assert_ready!(t1.enter(|cx| rx.poll_next(cx))); + assert!(val.is_none()); } #[test] fn drop_tx_with_permit_releases_permit() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + // poll_ready reserves capacity, ensure that the capacity is released if tx // is dropped w/o sending a value. let (mut tx1, _rx) = mpsc::channel::(1); let mut tx2 = tx1.clone(); - let mut task = MockTask::new(); - assert_ready!(tx1.poll_ready()); + assert_ready_ok!(t1.enter(|cx| tx1.poll_ready(cx))); - task.enter(|| { - assert_not_ready!(tx2.poll_ready()); + t2.enter(|cx| { + assert_pending!(tx2.poll_ready(cx)); }); drop(tx1); - assert!(task.is_notified()); + assert!(t2.is_woken()); - assert_ready!(tx2.poll_ready()); + assert_ready_ok!(t2.enter(|cx| tx2.poll_ready(cx))); } #[test] fn dropping_rx_closes_channel() { + let mut t1 = MockTask::new(); + let (mut tx, rx) = mpsc::channel(100); let msg = Arc::new(()); - tx.try_send(msg.clone()).unwrap(); + assert_ok!(tx.try_send(msg.clone())); drop(rx); - assert!(tx.poll_ready().is_err()); + assert_ready_err!(t1.enter(|cx| tx.poll_ready(cx))); assert_eq!(1, Arc::strong_count(&msg)); } @@ -346,7 +400,11 @@ fn dropping_rx_closes_channel_for_try() { tx.try_send(msg.clone()).unwrap(); drop(rx); - assert!(tx.try_send(msg.clone()).unwrap_err().is_closed()); + + { + let err = assert_err!(tx.try_send(msg.clone())); + assert!(err.is_closed()); + } assert_eq!(1, Arc::strong_count(&msg)); } diff --git a/tokio-sync/tests/semaphore.rs b/tokio-sync/tests/semaphore.rs index 8e40af8ae77..a19f261a402 100644 --- a/tokio-sync/tests/semaphore.rs +++ b/tokio-sync/tests/semaphore.rs @@ -1,32 +1,13 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] -use futures; -use tokio_mock_task::*; use tokio_sync::semaphore::{Permit, Semaphore}; - -macro_rules! assert_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::Ready(v)) => v, - Ok(_) => panic!("not ready"), - Err(e) => panic!("error = {:?}", e), - } - }}; -} - -macro_rules! assert_not_ready { - ($e:expr) => {{ - match $e { - Ok(futures::Async::NotReady) => {} - Ok(futures::Async::Ready(v)) => panic!("ready; value = {:?}", v), - Err(e) => panic!("error = {:?}", e), - } - }}; -} +use tokio_test::task::MockTask; +use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok}; #[test] fn available_permits() { + let mut t1 = MockTask::new(); + let s = Semaphore::new(100); assert_eq!(s.available_permits(), 100); @@ -34,39 +15,39 @@ fn available_permits() { let mut permit = Permit::new(); assert!(!permit.is_acquired()); - assert_ready!(permit.poll_acquire(&s)); + assert_ready_ok!(t1.enter(|cx| permit.poll_acquire(cx, &s))); assert_eq!(s.available_permits(), 99); assert!(permit.is_acquired()); // Polling again on the same waiter does not claim a new permit - assert_ready!(permit.poll_acquire(&s)); + assert_ready_ok!(t1.enter(|cx| permit.poll_acquire(cx, &s))); assert_eq!(s.available_permits(), 99); assert!(permit.is_acquired()); } #[test] fn unavailable_permits() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); let s = Semaphore::new(1); let mut permit_1 = Permit::new(); let mut permit_2 = Permit::new(); // Acquire the first permit - assert_ready!(permit_1.poll_acquire(&s)); + assert_ready_ok!(t1.enter(|cx| permit_1.poll_acquire(cx, &s))); assert_eq!(s.available_permits(), 0); - let mut task = MockTask::new(); - - task.enter(|| { + t2.enter(|cx| { // Try to acquire the second permit - assert_not_ready!(permit_2.poll_acquire(&s)); + assert_pending!(permit_2.poll_acquire(cx, &s)); }); permit_1.release(&s); assert_eq!(s.available_permits(), 0); - assert!(task.is_notified()); - assert_ready!(permit_2.poll_acquire(&s)); + assert!(t2.is_woken()); + assert_ready_ok!(t2.enter(|cx| permit_2.poll_acquire(cx, &s))); permit_2.release(&s); assert_eq!(s.available_permits(), 1); @@ -74,21 +55,22 @@ fn unavailable_permits() { #[test] fn zero_permits() { + let mut t1 = MockTask::new(); + let s = Semaphore::new(0); assert_eq!(s.available_permits(), 0); let mut permit = Permit::new(); - let mut task = MockTask::new(); // Try to acquire the permit - task.enter(|| { - assert_not_ready!(permit.poll_acquire(&s)); + t1.enter(|cx| { + assert_pending!(permit.poll_acquire(cx, &s)); }); s.add_permits(1); - assert!(task.is_notified()); - assert_ready!(permit.poll_acquire(&s)); + assert!(t1.is_woken()); + assert_ready_ok!(t1.enter(|cx| permit.poll_acquire(cx, &s))); } #[test] @@ -100,6 +82,8 @@ fn validates_max_permits() { #[test] fn close_semaphore_prevents_acquire() { + let mut t1 = MockTask::new(); + let s = Semaphore::new(1); s.close(); @@ -107,29 +91,32 @@ fn close_semaphore_prevents_acquire() { let mut permit = Permit::new(); - assert!(permit.poll_acquire(&s).is_err()); + assert_ready_err!(t1.enter(|cx| permit.poll_acquire(cx, &s))); assert_eq!(1, s.available_permits()); } #[test] fn close_semaphore_notifies_permit1() { - let s = Semaphore::new(0); + let mut t1 = MockTask::new(); + let s = Semaphore::new(0); let mut permit = Permit::new(); - let mut task = MockTask::new(); - task.enter(|| { - assert_not_ready!(permit.poll_acquire(&s)); - }); + assert_pending!(t1.enter(|cx| permit.poll_acquire(cx, &s))); s.close(); - assert!(task.is_notified()); - assert!(permit.poll_acquire(&s).is_err()); + assert!(t1.is_woken()); + assert_ready_err!(t1.enter(|cx| permit.poll_acquire(cx, &s))); } #[test] fn close_semaphore_notifies_permit2() { + let mut t1 = MockTask::new(); + let mut t2 = MockTask::new(); + let mut t3 = MockTask::new(); + let mut t4 = MockTask::new(); + let s = Semaphore::new(2); let mut permit1 = Permit::new(); @@ -138,27 +125,19 @@ fn close_semaphore_notifies_permit2() { let mut permit4 = Permit::new(); // Acquire a couple of permits - assert_ready!(permit1.poll_acquire(&s)); - assert_ready!(permit2.poll_acquire(&s)); + assert_ready_ok!(t1.enter(|cx| permit1.poll_acquire(cx, &s))); + assert_ready_ok!(t2.enter(|cx| permit2.poll_acquire(cx, &s))); - let mut task1 = MockTask::new(); - let mut task2 = MockTask::new(); - - task1.enter(|| { - assert_not_ready!(permit3.poll_acquire(&s)); - }); - - task2.enter(|| { - assert_not_ready!(permit4.poll_acquire(&s)); - }); + assert_pending!(t3.enter(|cx| permit3.poll_acquire(cx, &s))); + assert_pending!(t4.enter(|cx| permit4.poll_acquire(cx, &s))); s.close(); - assert!(task1.is_notified()); - assert!(task2.is_notified()); + assert!(t3.is_woken()); + assert!(t4.is_woken()); - assert!(permit3.poll_acquire(&s).is_err()); - assert!(permit4.poll_acquire(&s).is_err()); + assert_ready_err!(t3.enter(|cx| permit3.poll_acquire(cx, &s))); + assert_ready_err!(t4.enter(|cx| permit4.poll_acquire(cx, &s))); assert_eq!(0, s.available_permits()); @@ -166,7 +145,7 @@ fn close_semaphore_notifies_permit2() { assert_eq!(1, s.available_permits()); - assert!(permit1.poll_acquire(&s).is_err()); + assert_ready_err!(t1.enter(|cx| permit1.poll_acquire(cx, &s))); permit2.release(&s); diff --git a/tokio-sync/tests/watch.rs b/tokio-sync/tests/watch.rs index ec94516332f..10a6a822617 100644 --- a/tokio-sync/tests/watch.rs +++ b/tokio-sync/tests/watch.rs @@ -1,10 +1,10 @@ -#![cfg(feature = "broken")] #![deny(warnings, rust_2018_idioms)] -use futures; -use tokio_mock_task::*; use tokio_sync::watch; +use tokio_test::task::MockTask; +use tokio_test::{assert_pending, assert_ready}; +/* macro_rules! assert_ready { ($e:expr) => {{ match $e { @@ -24,143 +24,179 @@ macro_rules! assert_not_ready { } }}; } +*/ #[test] -fn single_rx() { - let (mut tx, mut rx) = watch::channel("one"); +fn single_rx_poll_ref() { + let (tx, mut rx) = watch::channel("one"); let mut task = MockTask::new(); - task.enter(|| { - let v = assert_ready!(rx.poll_ref()).unwrap(); - assert_eq!(*v, "one"); + task.enter(|cx| { + { + let v = assert_ready!(rx.poll_ref(cx)).unwrap(); + assert_eq!(*v, "one"); + } + assert_pending!(rx.poll_ref(cx)); }); - task.enter(|| assert_not_ready!(rx.poll_ref())); + tx.broadcast("two").unwrap(); - assert!(!task.is_notified()); + assert!(task.is_woken()); - tx.broadcast("two").unwrap(); + task.enter(|cx| { + { + let v = assert_ready!(rx.poll_ref(cx)).unwrap(); + assert_eq!(*v, "two"); + } + assert_pending!(rx.poll_ref(cx)); + }); + + drop(tx); + + assert!(task.is_woken()); - assert!(task.is_notified()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); + assert!(res.is_none()); + }); +} - task.enter(|| { - let v = assert_ready!(rx.poll_ref()).unwrap(); - assert_eq!(*v, "two"); +#[test] +fn single_rx_poll_next() { + let (tx, mut rx) = watch::channel("one"); + let mut task = MockTask::new(); + + task.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)).unwrap(); + assert_eq!(v, "one"); + assert_pending!(rx.poll_ref(cx)); }); - task.enter(|| assert_not_ready!(rx.poll_ref())); + tx.broadcast("two").unwrap(); + + assert!(task.is_woken()); + + task.enter(|cx| { + let v = assert_ready!(rx.poll_next(cx)).unwrap(); + assert_eq!(v, "two"); + assert_pending!(rx.poll_ref(cx)); + }); drop(tx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_next(cx)); assert!(res.is_none()); }); } #[test] +#[cfg(feature = "async-traits")] fn stream_impl() { - use futures::Stream; + use futures_core::Stream; + use pin_utils::pin_mut; - let (mut tx, mut rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); let mut task = MockTask::new(); - task.enter(|| { - let v = assert_ready!(rx.poll()).unwrap(); - assert_eq!(v, "one"); - }); - - task.enter(|| assert_not_ready!(rx.poll())); + pin_mut!(rx); - assert!(!task.is_notified()); + task.enter(|cx| { + { + let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap(); + assert_eq!(v, "one"); + } + assert_pending!(rx.poll_ref(cx)); + }); tx.broadcast("two").unwrap(); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let v = assert_ready!(rx.poll()).unwrap(); - assert_eq!(v, "two"); + task.enter(|cx| { + { + let v = assert_ready!(Stream::poll_next(rx.as_mut(), cx)).unwrap(); + assert_eq!(v, "two"); + } + assert_pending!(rx.poll_ref(cx)); }); - task.enter(|| assert_not_ready!(rx.poll())); - drop(tx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let res = assert_ready!(rx.poll()); + task.enter(|cx| { + let res = assert_ready!(Stream::poll_next(rx, cx)); assert!(res.is_none()); }); } #[test] fn multi_rx() { - let (mut tx, mut rx1) = watch::channel("one"); + let (tx, mut rx1) = watch::channel("one"); let mut rx2 = rx1.clone(); let mut task1 = MockTask::new(); let mut task2 = MockTask::new(); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "one"); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert_eq!(*res.unwrap(), "one"); }); tx.broadcast("two").unwrap(); - assert!(task1.is_notified()); - assert!(task2.is_notified()); + assert!(task1.is_woken()); + assert!(task2.is_woken()); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "two"); }); tx.broadcast("three").unwrap(); - assert!(task1.is_notified()); - assert!(task2.is_notified()); + assert!(task1.is_woken()); + assert!(task2.is_woken()); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "three"); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert_eq!(*res.unwrap(), "three"); }); tx.broadcast("four").unwrap(); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert_eq!(*res.unwrap(), "four"); }); drop(tx); - task1.enter(|| { - let res = assert_ready!(rx1.poll_ref()); + task1.enter(|cx| { + let res = assert_ready!(rx1.poll_ref(cx)); assert!(res.is_none()); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert_eq!(*res.unwrap(), "four"); }); - task2.enter(|| { - let res = assert_ready!(rx2.poll_ref()); + task2.enter(|cx| { + let res = assert_ready!(rx2.poll_ref(cx)); assert!(res.is_none()); }); } @@ -174,45 +210,47 @@ fn rx_observes_final_value() { drop(tx); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_some()); assert_eq!(*res.unwrap(), "one"); }); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_none()); }); // Sending a value - let (mut tx, mut rx) = watch::channel("one"); + let (tx, mut rx) = watch::channel("one"); let mut task = MockTask::new(); tx.broadcast("two").unwrap(); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); - assert!(res.is_some()); - assert_eq!(*res.unwrap(), "two"); - }); + task.enter(|cx| { + { + let res = assert_ready!(rx.poll_ref(cx)); + assert!(res.is_some()); + assert_eq!(*res.unwrap(), "two"); + } - task.enter(|| assert_not_ready!(rx.poll_ref())); + assert_pending!(rx.poll_ref(cx)); + }); tx.broadcast("three").unwrap(); drop(tx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_some()); assert_eq!(*res.unwrap(), "three"); }); - task.enter(|| { - let res = assert_ready!(rx.poll_ref()); + task.enter(|cx| { + let res = assert_ready!(rx.poll_ref(cx)); assert!(res.is_none()); }); } @@ -222,13 +260,13 @@ fn poll_close() { let (mut tx, rx) = watch::channel("one"); let mut task = MockTask::new(); - task.enter(|| assert_not_ready!(tx.poll_close())); + assert_pending!(task.enter(|cx| tx.poll_close(cx))); drop(rx); - assert!(task.is_notified()); + assert!(task.is_woken()); - task.enter(|| assert_ready!(tx.poll_close())); + assert_ready!(task.enter(|cx| tx.poll_close(cx))); assert!(tx.broadcast("two").is_err()); }