From bfc4414a959b1a7cfeaf184431aba2b4b1dca93c Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Sun, 2 Feb 2020 21:57:32 -0800 Subject: [PATCH 01/12] sync: adds Notify for basic task notification `Notify` provides a synchronization primitive similar to thread park / unpark, except for tasks. --- tokio/src/loom/std/mod.rs | 1 + tokio/src/macros/pin.rs | 10 +- tokio/src/sync/mod.rs | 12 + tokio/src/sync/notify.rs | 496 ++++++++++++++++++++++++++++ tokio/src/sync/tests/loom_notify.rs | 88 +++++ tokio/src/sync/tests/mod.rs | 1 + tokio/src/util/linked_list.rs | 483 +++++++++++++++++++++++++++ tokio/src/util/mod.rs | 2 + tokio/tests/sync_notify.rs | 102 ++++++ 9 files changed, 1194 insertions(+), 1 deletion(-) create mode 100644 tokio/src/sync/notify.rs create mode 100644 tokio/src/sync/tests/loom_notify.rs create mode 100644 tokio/src/util/linked_list.rs create mode 100644 tokio/tests/sync_notify.rs diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index d5e057e51e8..e4bae357b5f 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -64,6 +64,7 @@ pub(crate) mod sync { pub(crate) use crate::loom::std::atomic_u64::AtomicU64; pub(crate) use crate::loom::std::atomic_usize::AtomicUsize; + pub(crate) use std::sync::atomic::AtomicU8; pub(crate) use std::sync::atomic::{fence, AtomicPtr}; pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool}; } diff --git a/tokio/src/macros/pin.rs b/tokio/src/macros/pin.rs index e8511d3f3dc..f3dc66e710c 100644 --- a/tokio/src/macros/pin.rs +++ b/tokio/src/macros/pin.rs @@ -108,5 +108,13 @@ macro_rules! pin { let mut $x = unsafe { $crate::macros::support::Pin::new_unchecked(&mut $x) }; - )* } + )* }; + ($( + let $x:ident = $init:expr; + )*) => { + $( + let $x = $init; + crate::pin!($x); + )* + } } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index ff72c521360..befdcae8eb9 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -406,9 +406,18 @@ //! * [`Mutex`][Mutex] Mutual Exclusion mechanism, which ensures that at most //! one thread at a time is able to access some data. //! +//! * [`Notify`][Notify] Basic task notification. `Notify` supports notifying a +//! receiving task without sending data. In this case, the task wakes up and +//! resumes processing. +//! //! * [`RwLock`][RwLock] Provides a mutual exclusion mechanism which allows //! multiple readers at the same time, while allowing only one writer at a //! time. In some cases, this can be more efficient than a mutex. +//! +//! * [`Semaphore`][Semaphore] Limits the amount of concurrency. A semaphore +//! holds a number of permits, which tasks may request in order to enter a +//! critical section. Semaphores are useful for implementing limiting of +//! bounding of any kind. cfg_sync! { mod barrier; @@ -421,6 +430,9 @@ cfg_sync! { mod mutex; pub use mutex::{Mutex, MutexGuard}; + mod notify; + pub use notify::Notify; + pub mod oneshot; pub(crate) mod semaphore_ll; diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs new file mode 100644 index 00000000000..b0b25c425ec --- /dev/null +++ b/tokio/src/sync/notify.rs @@ -0,0 +1,496 @@ +use crate::loom::sync::atomic::AtomicU8; +use crate::loom::sync::Mutex; +use crate::util::linked_list::{self, LinkedList}; + +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::Ordering::SeqCst; +use std::task::{Context, Poll, Waker}; + +/// Notify a single task to wake up. +/// +/// `Notify` provides a basic mechanism to notify a single task of an event. +/// `Notify` itself does not carry any data. Instead, it is to be used to signal +/// another task to perform an operation. +/// +/// The synchronization details of `Notify` are similar to +/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] +/// value contains a single permit. [`Notify::recv`] waits for the permit to be +/// made available, consumes the permit, and resumes. [`Notify::notify_one`] +/// sets the permit, waking a pending task if there is one. +/// +/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. +/// [`Notify::recv`] waits for a permit to become available and +/// [`Notify::notify_one`] sets a permit **if there currently are no available +/// permits**. +/// +/// If `notify_one` is called **before** `recv()`, then the next call to +/// `recv()` will complete immediately, consuming the permit. Any subsequent +/// calls to `recv()` will wait for a new permit. +/// +/// If `notify_one` is called **multiple** times before `recv()`, only a +/// **single** permit is stored. The next call to `recv()` will complete +/// immediately, but the one after will wait for a new permit. +/// +/// # Examples +/// +/// Basic usage. +/// +/// ``` +/// use tokio::sync::Notify; +/// use std::sync::Arc; +/// +/// #[tokio::main] +/// async fn main() { +/// let notify = Arc::new(Notify::new()); +/// let notify2 = notify.clone(); +/// +/// tokio::spawn(async move { +/// notify2.recv().await; +/// println!("received a notification"); +/// }); +/// +/// notify.notify_one(); +/// } +/// ``` +/// +/// Unbound mpsc channel. +/// +/// ``` +/// use tokio::sync::Notify; +/// use std::collections::VecDeque; +/// use std::sync::Mutex; +/// +/// struct Channel<T> { +/// values: Mutex<VecDeque<T>>, +/// notify: Notify, +/// } +/// +/// impl<T> Channel<T> { +/// pub fn send(&self, value: T) { +/// self.values.lock().unwrap() +/// .push_back(value); +/// +/// // Notify the consumer a value is available +/// self.notify.notify_one(); +/// } +/// +/// pub async fn recv(&self) -> T { +/// loop { +/// // Drain values +/// if let Some(value) = self.values.lock().unwrap().pop_front() { +/// return value; +/// } +/// +/// // Wait for values to be available +/// self.notify.recv().await; +/// } +/// } +/// } +/// +/// ``` +/// +/// [park]: std::thread::park +/// [unpark]: std::thread::Thread::unpark +/// [`Notify::recv`]: Notify::recv() +/// [`Notify::notify_one`]: Notify::notify_one() +/// [`Semaphore`]: crate::sync::Semaphore +#[derive(Debug)] +pub struct Notify { + state: AtomicU8, + waiters: Mutex<LinkedList<Waiter>>, +} + +#[derive(Debug)] +struct Waiter { + /// Waiting task's waker + waker: Option<Waker>, + + /// `true` if the notification has been assigned to this waiter. + notified: bool, +} + +#[derive(Debug)] +struct RecvFuture<'a> { + /// The `Notify` being received on. + notify: &'a Notify, + + /// The current state of the receiving process. + state: RecvState, + + /// Entry in the waiter `LinkedList`. + waiter: linked_list::Entry<Waiter>, +} + +#[derive(Debug)] +enum RecvState { + Init, + Waiting, + Done, +} + +/// Initial "idle" state +const EMPTY: u8 = 0; + +/// One or more threads are currently waiting to be notified. +const WAITING: u8 = 1; + +/// Pending notification +const NOTIFIED: u8 = 2; + +impl Notify { + /// Create a new `Notify`, initialized without a permit. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// let notify = Notify::new(); + /// ``` + pub fn new() -> Notify { + Notify { + state: AtomicU8::new(0), + waiters: Mutex::new(LinkedList::new()), + } + } + + /// Wait for a notification. + /// + /// Each `Notify` value holds a single permit. If a permit is available from + /// an earlier call to `notify_one`, then `recv` will complete immediately, + /// consuming that permit. Otherwise, `recv()` waits for a permit to be made + /// available by the next call to `notify_one` + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let notify = Arc::new(Notify::new()); + /// let notify2 = notify.clone(); + /// + /// tokio::spawn(async move { + /// notify2.recv().await; + /// println!("received a notification"); + /// }); + /// + /// notify.notify_one(); + /// } + /// ``` + pub async fn recv(&self) { + RecvFuture { + notify: self, + state: RecvState::Init, + waiter: linked_list::Entry::new(Waiter { + waker: None, + notified: false, + }), + } + .await + } + + /// Notifies a waiting task + /// + /// If a task is currently waiting, that task is notified. Otherwise, a + /// permit is stored in this `Notify` value and the **next** call to + /// `recv()` will complete immediately consuming the permit made available + /// by this call to `notify_one()`. + /// + /// At most one permit may be stored by `Notify`. Many sequential calls to + /// `notify_one` will result in a single permit being stored. The next call + /// to `recv()` will complete immediately, but the one after that will wait. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// use std::sync::Arc; + /// + /// #[tokio::main] + /// async fn main() { + /// let notify = Arc::new(Notify::new()); + /// let notify2 = notify.clone(); + /// + /// tokio::spawn(async move { + /// notify2.recv().await; + /// println!("received a notification"); + /// }); + /// + /// notify.notify_one(); + /// } + /// ``` + pub fn notify_one(&self) { + // Load the current state + let mut curr = self.state.load(SeqCst); + + // If the state is `EMPTY`, transition to `NOTIFIED` and return. + loop { + match curr { + EMPTY | NOTIFIED => { + // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is + // intended a happens-before synchronization must happen + // between this atomic operation and a task calling + // `recv()`. + let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst); + + match res { + // No waiters, no further work to do + Ok(_) => return, + Err(actual) => { + curr = actual; + } + } + } + _ => break, + } + } + + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock().unwrap(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + curr = self.state.load(SeqCst); + + if let Some(waker) = notify_locked(&mut waiters, &self.state, curr) { + drop(waiters); + waker.wake(); + } + } +} + +fn notify_locked( + waiters: &mut LinkedList<Waiter>, + state: &AtomicU8, + mut curr: u8, +) -> Option<Waker> { + loop { + match curr { + EMPTY | NOTIFIED => { + let res = state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst); + + match res { + Ok(_) => return None, + Err(actual) => { + curr = actual; + } + } + } + WAITING => { + // At this point, it is guaranteed that the state will not + // concurrently change as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get a pending waiter + let mut waiter = waiters.pop_back().unwrap(); + + assert!(!waiter.notified); + + waiter.notified = true; + let waker = waiter.waker.take(); + + if waiters.is_empty() { + // As this the **final** waiter in the list, the state + // must be transitioned to `EMPTY`. As transitioning + // **from** `WAITING` requires the lock to be held, a + // `store` is sufficient. + state.store(EMPTY, SeqCst); + } + + return waker; + } + _ => unreachable!(), + } + } +} + +// ===== impl RecvFuture ===== + +impl RecvFuture<'_> { + fn project( + self: Pin<&mut Self>, + ) -> ( + &Notify, + &mut RecvState, + Pin<&mut linked_list::Entry<Waiter>>, + ) { + unsafe { + // Safety: both `notify` and `state` are `Unpin`. + + is_unpin::<&Notify>(); + is_unpin::<AtomicU8>(); + + let me = self.get_unchecked_mut(); + ( + &me.notify, + &mut me.state, + Pin::new_unchecked(&mut me.waiter), + ) + } + } +} + +impl Future for RecvFuture<'_> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + use RecvState::*; + + let (notify, state, mut waiter) = self.project(); + + 'outer: loop { + match *state { + Init => { + // Optimistically try acquiring a pending notification + let res = notify + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst); + + if res.is_ok() { + // Acquired the notification + *state = Done; + continue 'outer; + } + + // Acquire the lock and attempt to transition to the waiting + // state. + let mut waiters = notify.waiters.lock().unwrap(); + + // Reload the state with the lock held + let mut curr = notify.state.load(SeqCst); + + // Transition the state to WAITING. + loop { + match curr { + EMPTY => { + // Transition to WAITING + let res = notify + .state + .compare_exchange(EMPTY, WAITING, SeqCst, SeqCst); + + if let Err(actual) = res { + curr = actual; + continue; + } + } + WAITING => {} + NOTIFIED => { + // Try consuming the notification + let res = notify + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst); + + match res { + Ok(_) => { + // Acquired the notification + *state = Done; + continue 'outer; + } + Err(actual) => { + curr = actual; + continue; + } + } + } + _ => unreachable!(), + } + + // The state has been transitioned to waiting + break; + } + + // Safety: called while locked. + unsafe { + (*waiter.as_mut().get()).waker = Some(cx.waker().clone()); + + // Insert the waiter into the linked list + waiters.push_front(waiter.as_mut()); + } + + *state = Waiting; + } + Waiting => { + let mut waiters = notify.waiters.lock().unwrap(); + + { + // Safety: called while locked + let w = unsafe { &mut *waiter.as_mut().get() }; + + if w.notified { + w.waker = None; + w.notified = false; + + // Remove the entry from the linked list. The entry + // may only be in the linked list while the state is + // `Waiting`. + // + // Safety: called while the lock is held + unsafe { + waiters.remove(waiter.as_mut()); + } + + *state = Done; + } else { + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + } + } + Done => { + return Poll::Ready(()); + } + } + } + } +} + +impl Drop for RecvFuture<'_> { + fn drop(&mut self) { + use RecvState::*; + + // Safety: The type only transitions to a "Waiting" state when pinned. + let (notify, state, mut waiter) = unsafe { Pin::new_unchecked(self).project() }; + + // This is where we ensure safety. The `RecvFuture` is being dropped, + // which means we must ensure that the waiter entry is no longer stored + // in the linked list. + if let Waiting = *state { + let mut notify_state = WAITING; + let mut waiters = notify.waiters.lock().unwrap(); + + // remove the entry from the list + // + // safety: the waiter is only added to `waiters` by virtue of it + // being the only `LinkedList` available to the type. + unsafe { waiters.remove(waiter.as_mut()) }; + + if waiters.is_empty() { + notify_state = EMPTY; + notify.state.store(EMPTY, SeqCst); + } + + // See if the node was notified but not received. In this case, the + // notification must be sent to another waiter. + // + // Safety: with the entry removed from the linked list, there can be + // no concurrent access to the entry + let notified = unsafe { (*waiter.as_mut().get()).notified }; + + if notified { + if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { + drop(waiters); + waker.wake(); + } + } + } + } +} + +fn is_unpin<T: Unpin>() {} diff --git a/tokio/src/sync/tests/loom_notify.rs b/tokio/src/sync/tests/loom_notify.rs new file mode 100644 index 00000000000..23fc2f35aa7 --- /dev/null +++ b/tokio/src/sync/tests/loom_notify.rs @@ -0,0 +1,88 @@ +use crate::sync::Notify; + +use loom::future::block_on; +use loom::sync::Arc; +use loom::thread; + +#[test] +fn notify_one() { + loom::model(|| { + let tx = Arc::new(Notify::new()); + let rx = tx.clone(); + + let th = thread::spawn(move || { + block_on(async { + rx.recv().await; + }); + }); + + tx.notify_one(); + th.join().unwrap(); + }); +} + +#[test] +fn notify_multi() { + loom::model(|| { + let notify = Arc::new(Notify::new()); + + let mut ths = vec![]; + + for _ in 0..2 { + let notify = notify.clone(); + + ths.push(thread::spawn(move || { + block_on(async { + notify.recv().await; + notify.notify_one(); + }) + })); + } + + notify.notify_one(); + + for th in ths.drain(..) { + th.join().unwrap(); + } + + block_on(async { + notify.recv().await; + }); + }); +} + +#[test] +fn notify_drop() { + use crate::future::poll_fn; + use std::future::Future; + use std::task::Poll; + + loom::model(|| { + let notify = Arc::new(Notify::new()); + let rx1 = notify.clone(); + let rx2 = notify.clone(); + + let th1 = thread::spawn(move || { + let mut recv = Box::pin(async { + rx1.recv().await; + rx1.notify_one(); + }); + + block_on(poll_fn(|cx| { + let _ = recv.as_mut().poll(cx); + Poll::Ready(()) + })); + }); + + let th2 = thread::spawn(move || { + block_on(async { + rx2.recv().await; + }); + }); + + notify.notify_one(); + + th1.join().unwrap(); + th2.join().unwrap(); + }); +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index 2ee140cbf18..7225ce9c58c 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -8,6 +8,7 @@ cfg_loom! { mod loom_broadcast; mod loom_list; mod loom_mpsc; + mod loom_notify; mod loom_oneshot; mod loom_semaphore_ll; } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs new file mode 100644 index 00000000000..1d50bdc4899 --- /dev/null +++ b/tokio/src/util/linked_list.rs @@ -0,0 +1,483 @@ +//! An intrusive double linked list of data +//! +//! The data structure supports tracking pinned nodes. Most of the data +//! structure's APIs are `unsafe` as they require the caller to ensure the +//! specified node is actually + +use core::cell::UnsafeCell; +use core::marker::PhantomPinned; +use core::pin::Pin; +use core::ptr::NonNull; + +/// An intrusive linked list of nodes, where each node carries associated data +/// of type `T`. +#[derive(Debug)] +pub(crate) struct LinkedList<T> { + head: Option<NonNull<Entry<T>>>, + tail: Option<NonNull<Entry<T>>>, +} + +unsafe impl<T: Send> Send for LinkedList<T> {} +unsafe impl<T: Sync> Sync for LinkedList<T> {} + +/// A node which carries data of type `T` and is stored in an intrusive list. +#[derive(Debug)] +pub(crate) struct Entry<T> { + /// The previous node in the list. null if there is no previous node. + prev: Option<NonNull<Entry<T>>>, + + /// The next node in the list. null if there is no previous node. + next: Option<NonNull<Entry<T>>>, + + /// The data which is associated to this list item + data: UnsafeCell<T>, + + /// Prevents `Entry`s from being `Unpin`. They may never be moved, since + /// the list semantics require addresses to be stable. + _pin: PhantomPinned, +} + +unsafe impl<T: Send> Send for Entry<T> {} +unsafe impl<T: Sync> Sync for Entry<T> {} + +impl<T> LinkedList<T> { + /// Creates an empty linked list + pub(crate) fn new() -> Self { + LinkedList { + head: None, + tail: None, + } + } + + /// Adds an item to the back of the linked list. + /// + /// # Safety + /// + /// The function is only safe as long as valid pointers are stored inside + /// the linked list. + pub(crate) unsafe fn push_front(&mut self, entry: Pin<&mut Entry<T>>) { + let mut entry: NonNull<Entry<T>> = entry.get_unchecked_mut().into(); + + entry.as_mut().next = self.head; + entry.as_mut().prev = None; + + if let Some(head) = &mut self.head { + head.as_mut().prev = Some(entry); + } + + self.head = Some(entry); + + if self.tail.is_none() { + self.tail = Some(entry); + } + } + + /// Removes the first element and returns it, or `None` if the list is empty. + /// + /// The function is safe as the lifetime of the entry is bound to `&mut + /// self`. + pub(crate) fn pop_back(&mut self) -> Option<Pin<&mut T>> { + unsafe { + let mut last = match self.tail { + Some(tail) => { + self.tail = tail.as_ref().prev; + tail + } + None => return None, + }; + + if let Some(mut prev) = last.as_mut().prev { + prev.as_mut().next = None; + } else { + self.head = None + } + + last.as_mut().prev = None; + last.as_mut().next = None; + + let val = &mut *last.as_mut().data.get(); + + Some(Pin::new_unchecked(val)) + } + } + + /// Returns whether the linked list doesn not contain any node + pub(crate) fn is_empty(&self) -> bool { + if !self.head.is_none() { + return false; + } + + assert!(self.tail.is_none()); + true + } + + /// Removes the given item from the linked list. + /// + /// # Safety + /// + /// The caller **must** ensure that `entry` is currently contained by + /// `self`. + pub(crate) unsafe fn remove(&mut self, entry: Pin<&mut Entry<T>>) -> bool { + let mut entry = NonNull::from(entry.get_unchecked_mut()); + + if let Some(mut prev) = entry.as_mut().prev { + debug_assert_eq!(prev.as_ref().next, Some(entry)); + prev.as_mut().next = entry.as_ref().next; + } else { + if self.head != Some(entry) { + return false; + } + + self.head = entry.as_ref().next; + } + + if let Some(mut next) = entry.as_mut().next { + debug_assert_eq!(next.as_ref().prev, Some(entry)); + next.as_mut().prev = entry.as_ref().prev; + } else { + // This might be the last item in the list + if self.tail != Some(entry) { + return false; + } + + self.tail = entry.as_ref().prev; + } + + entry.as_mut().next = None; + entry.as_mut().prev = None; + + true + } +} + +impl<T> Entry<T> { + /// Creates a new node with the associated data + pub(crate) fn new(data: T) -> Entry<T> { + Entry { + prev: None, + next: None, + data: UnsafeCell::new(data), + _pin: PhantomPinned, + } + } + + /// Get a raw pointer to the inner data + pub(crate) fn get(&self) -> *mut T { + self.data.get() + } +} + +#[cfg(test)] +#[cfg(not(loom))] +mod tests { + use super::*; + + fn collect_list<T: Copy>(list: &mut LinkedList<T>) -> Vec<T> { + let mut ret = vec![]; + + while let Some(v) = list.pop_back() { + ret.push(*v); + } + + ret + } + + unsafe fn push_all(list: &mut LinkedList<i32>, entries: &mut [Pin<&mut Entry<i32>>]) { + for entry in entries.iter_mut() { + list.push_front(entry.as_mut()); + } + } + + macro_rules! assert_clean { + ($e:ident) => {{ + assert!($e.next.is_none()); + assert!($e.prev.is_none()); + }}; + } + + macro_rules! assert_ptr_eq { + ($a:expr, $b:expr) => {{ + // Deal with mapping a Pin<&mut T> -> Option<NonNull<T>> + assert_eq!(Some($a.as_mut().get_unchecked_mut().into()), $b) + }}; + } + + #[test] + fn push_and_drain() { + pin! { + let a = Entry::new(5); + let b = Entry::new(7); + let c = Entry::new(31); + } + + let mut list = LinkedList::new(); + assert!(list.is_empty()); + + unsafe { + list.push_front(a); + assert!(!list.is_empty()); + list.push_front(b); + list.push_front(c); + } + + let items: Vec<i32> = collect_list(&mut list); + assert_eq!([5, 7, 31].to_vec(), items); + + assert!(list.is_empty()); + } + + #[test] + fn push_pop_push_pop() { + pin! { + let a = Entry::new(5); + let b = Entry::new(7); + } + + let mut list = LinkedList::new(); + + unsafe { + list.push_front(a); + } + + let v = list.pop_back().unwrap(); + assert_eq!(5, *v); + assert!(list.is_empty()); + + unsafe { + list.push_front(b); + } + + let v = list.pop_back().unwrap(); + assert_eq!(7, *v); + + assert!(list.is_empty()); + assert!(list.pop_back().is_none()); + } + + #[test] + fn remove_by_address() { + pin! { + let a = Entry::new(5); + let b = Entry::new(7); + let c = Entry::new(31); + } + + unsafe { + // Remove first + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]); + assert!(list.remove(a.as_mut())); + assert_clean!(a); + // `a` should be no longer there and can't be removed twice + assert!(!list.remove(a.as_mut())); + assert!(!list.is_empty()); + + assert!(list.remove(b.as_mut())); + assert_clean!(b); + // `b` should be no longer there and can't be removed twice + assert!(!list.remove(b.as_mut())); + assert!(!list.is_empty()); + + assert!(list.remove(c.as_mut())); + assert_clean!(c); + // `b` should be no longer there and can't be removed twice + assert!(!list.remove(c.as_mut())); + assert!(list.is_empty()); + } + + unsafe { + // Remove middle + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]); + + assert!(list.remove(a.as_mut())); + assert_clean!(a); + + assert_ptr_eq!(b, list.head); + assert_ptr_eq!(c, b.next); + assert_ptr_eq!(b, c.prev); + + let items = collect_list(&mut list); + assert_eq!([31, 7].to_vec(), items); + } + + unsafe { + // Remove middle + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]); + + assert!(list.remove(b.as_mut())); + assert_clean!(b); + + assert_ptr_eq!(c, a.next); + assert_ptr_eq!(a, c.prev); + + let items = collect_list(&mut list); + assert_eq!([31, 5].to_vec(), items); + } + + unsafe { + // Remove last + // Remove middle + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [c.as_mut(), b.as_mut(), a.as_mut()]); + + assert!(list.remove(c.as_mut())); + assert_clean!(c); + + assert!(b.next.is_none()); + assert_ptr_eq!(b, list.tail); + + let items = collect_list(&mut list); + assert_eq!([7, 5].to_vec(), items); + } + + unsafe { + // Remove first of two + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [b.as_mut(), a.as_mut()]); + + assert!(list.remove(a.as_mut())); + + assert_clean!(a); + + // a should be no longer there and can't be removed twice + assert!(!list.remove(a.as_mut())); + + assert_ptr_eq!(b, list.head); + assert_ptr_eq!(b, list.tail); + + assert!(b.next.is_none()); + assert!(b.prev.is_none()); + + let items = collect_list(&mut list); + assert_eq!([7].to_vec(), items); + } + + unsafe { + // Remove last of two + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [b.as_mut(), a.as_mut()]); + + assert!(list.remove(b.as_mut())); + + assert_clean!(b); + + assert_ptr_eq!(a, list.head); + assert_ptr_eq!(a, list.tail); + + assert!(a.next.is_none()); + assert!(a.prev.is_none()); + + let items = collect_list(&mut list); + assert_eq!([5].to_vec(), items); + } + + unsafe { + // Remove last item + let mut list = LinkedList::new(); + + push_all(&mut list, &mut [a.as_mut()]); + + assert!(list.remove(a.as_mut())); + assert_clean!(a); + + assert!(list.head.is_none()); + assert!(list.tail.is_none()); + let items = collect_list(&mut list); + assert!(items.is_empty()); + } + + unsafe { + // Remove missing + let mut list = LinkedList::new(); + + list.push_front(b.as_mut()); + list.push_front(a.as_mut()); + + assert!(!list.remove(c.as_mut())); + } + } + + proptest::proptest! { + #[test] + fn fuzz_linked_list(ops: Vec<usize>) { + run_fuzz(ops); + } + } + + fn run_fuzz(ops: Vec<usize>) { + use std::collections::VecDeque; + + #[derive(Debug)] + enum Op { + Push, + Pop, + Remove(usize), + } + + let ops = ops + .iter() + .map(|i| match i % 3 { + 0 => Op::Push, + 1 => Op::Pop, + 2 => Op::Remove(i / 3), + _ => unreachable!(), + }) + .collect::<Vec<_>>(); + + let mut next = 0; + let mut ll = LinkedList::new(); + let mut entries = VecDeque::new(); + let mut reference = VecDeque::new(); + + for op in ops { + match op { + Op::Push => { + let v = next; + next += 1; + + reference.push_front(v); + entries.push_front(Box::pin(Entry::new(v))); + + unsafe { + ll.push_front(entries.front_mut().unwrap().as_mut()); + } + } + Op::Pop => { + if reference.is_empty() { + assert!(ll.is_empty()); + continue; + } + + let v = reference.pop_back(); + assert_eq!(v, ll.pop_back().map(|v| *v)); + entries.pop_back(); + } + Op::Remove(n) => { + if reference.is_empty() { + assert!(ll.is_empty()); + continue; + } + + let idx = n % reference.len(); + + unsafe { + assert!(ll.remove(entries[idx].as_mut())); + } + + let v = reference.remove(idx).unwrap(); + assert_eq!(v, unsafe { *entries[idx].get() }); + + entries.remove(idx); + } + } + } + } +} diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index cd5b151d137..ad8e1b0d097 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,6 +3,8 @@ cfg_io_driver! { pub(crate) mod slab; } +pub(crate) mod linked_list; + #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] mod rand; diff --git a/tokio/tests/sync_notify.rs b/tokio/tests/sync_notify.rs new file mode 100644 index 00000000000..6f0e4cfdb9e --- /dev/null +++ b/tokio/tests/sync_notify.rs @@ -0,0 +1,102 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::sync::Notify; +use tokio_test::task::spawn; +use tokio_test::*; + +trait AssertSend: Send + Sync {} +impl AssertSend for Notify {} + +#[test] +fn notify_recv_one() { + let notify = Notify::new(); + let mut recv = spawn(async { notify.recv().await }); + + notify.notify_one(); + assert_ready!(recv.poll()); +} + +#[test] +fn recv_one_notify() { + let notify = Notify::new(); + let mut recv = spawn(async { notify.recv().await }); + + assert_pending!(recv.poll()); + + notify.notify_one(); + assert!(recv.is_woken()); + assert_ready!(recv.poll()); +} + +#[test] +fn recv_multi_notify() { + let notify = Notify::new(); + let mut recv1 = spawn(async { notify.recv().await }); + let mut recv2 = spawn(async { notify.recv().await }); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + notify.notify_one(); + assert!(recv1.is_woken()); + assert!(!recv2.is_woken()); + + assert_ready!(recv1.poll()); + assert_pending!(recv2.poll()); +} + +#[test] +fn notify_recv_multi() { + let notify = Notify::new(); + + notify.notify_one(); + + let mut recv1 = spawn(async { notify.recv().await }); + let mut recv2 = spawn(async { notify.recv().await }); + + assert_ready!(recv1.poll()); + assert_pending!(recv2.poll()); + + notify.notify_one(); + + assert!(recv2.is_woken()); + assert_ready!(recv2.poll()); +} + +#[test] +fn recv_drop_recv_notify() { + let notify = Notify::new(); + let mut recv1 = spawn(async { notify.recv().await }); + let mut recv2 = spawn(async { notify.recv().await }); + + assert_pending!(recv1.poll()); + + drop(recv1); + + assert_pending!(recv2.poll()); + + notify.notify_one(); + assert!(recv2.is_woken()); + assert_ready!(recv2.poll()); +} + +#[test] +fn recv_multi_notify_drop_one() { + let notify = Notify::new(); + let mut recv1 = spawn(async { notify.recv().await }); + let mut recv2 = spawn(async { notify.recv().await }); + + assert_pending!(recv1.poll()); + assert_pending!(recv2.poll()); + + notify.notify_one(); + + assert!(recv1.is_woken()); + assert!(!recv2.is_woken()); + + drop(recv1); + + assert!(recv2.is_woken()); + assert_ready!(recv2.poll()); +} From 18906454273d77cc0faa2c736b9e0079eee3f583 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Mon, 3 Feb 2020 09:05:04 -0800 Subject: [PATCH 02/12] fix warnings --- tokio/src/sync/notify.rs | 36 +++++++++++++++++------------------ tokio/src/util/linked_list.rs | 2 +- tokio/src/util/mod.rs | 4 +++- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index b0b25c425ec..e761bb5b2d3 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -87,7 +87,6 @@ use std::task::{Context, Poll, Waker}; /// } /// } /// } -/// /// ``` /// /// [park]: std::thread::park @@ -228,24 +227,19 @@ impl Notify { let mut curr = self.state.load(SeqCst); // If the state is `EMPTY`, transition to `NOTIFIED` and return. - loop { - match curr { - EMPTY | NOTIFIED => { - // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is - // intended a happens-before synchronization must happen - // between this atomic operation and a task calling - // `recv()`. - let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst); - - match res { - // No waiters, no further work to do - Ok(_) => return, - Err(actual) => { - curr = actual; - } - } + while let EMPTY | NOTIFIED = curr { + // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is + // intended a happens-before synchronization must happen + // between this atomic operation and a task calling + // `recv()`. + let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst); + + match res { + // No waiters, no further work to do + Ok(_) => return, + Err(actual) => { + curr = actual; } - _ => break, } } @@ -263,6 +257,12 @@ impl Notify { } } +impl Default for Notify { + fn default() -> Notify { + Notify::new() + } +} + fn notify_locked( waiters: &mut LinkedList<Waiter>, state: &AtomicU8, diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 1d50bdc4899..b2486069317 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -103,7 +103,7 @@ impl<T> LinkedList<T> { /// Returns whether the linked list doesn not contain any node pub(crate) fn is_empty(&self) -> bool { - if !self.head.is_none() { + if self.head.is_some() { return false; } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index ad8e1b0d097..2761f7252fd 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,9 @@ cfg_io_driver! { pub(crate) mod slab; } -pub(crate) mod linked_list; +cfg_sync! { + pub(crate) mod linked_list; +} #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] mod rand; From c913235915002152e173099c8912df839594d867 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Tue, 25 Feb 2020 21:41:59 -0800 Subject: [PATCH 03/12] apply feedback --- tokio/src/sync/notify.rs | 91 ++++++++++++++++++----------------- tokio/src/util/linked_list.rs | 11 ++--- 2 files changed, 51 insertions(+), 51 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index e761bb5b2d3..4cc1bad8373 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -13,17 +13,17 @@ use std::task::{Context, Poll, Waker}; /// `Notify` itself does not carry any data. Instead, it is to be used to signal /// another task to perform an operation. /// +/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. +/// [`Notify::recv`] waits for a permit to become available and +/// [`Notify::notify_one`] sets a permit **if there currently are no available +/// permits**. +/// /// The synchronization details of `Notify` are similar to /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] /// value contains a single permit. [`Notify::recv`] waits for the permit to be /// made available, consumes the permit, and resumes. [`Notify::notify_one`] /// sets the permit, waking a pending task if there is one. /// -/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. -/// [`Notify::recv`] waits for a permit to become available and -/// [`Notify::notify_one`] sets a permit **if there currently are no available -/// permits**. -/// /// If `notify_one` is called **before** `recv()`, then the next call to /// `recv()` will complete immediately, consuming the permit. Any subsequent /// calls to `recv()` will wait for a new permit. @@ -47,9 +47,10 @@ use std::task::{Context, Poll, Waker}; /// /// tokio::spawn(async move { /// notify2.recv().await; -/// println!("received a notification"); +/// println!("received notification"); /// }); /// +/// println!("sending notification"); /// notify.notify_one(); /// } /// ``` @@ -228,10 +229,9 @@ impl Notify { // If the state is `EMPTY`, transition to `NOTIFIED` and return. while let EMPTY | NOTIFIED = curr { - // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is - // intended a happens-before synchronization must happen - // between this atomic operation and a task calling - // `recv()`. + // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A + // happens-before synchronization must happen between this atomic + // operation and a task calling `recv()`. let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst); match res { @@ -266,7 +266,7 @@ impl Default for Notify { fn notify_locked( waiters: &mut LinkedList<Waiter>, state: &AtomicU8, - mut curr: u8, + curr: u8, ) -> Option<Waker> { loop { match curr { @@ -276,7 +276,9 @@ fn notify_locked( match res { Ok(_) => return None, Err(actual) => { - curr = actual; + assert!(actual == EMPTY || actual == NOTIFIED); + state.store(NOTIFIED, SeqCst); + return None; } } } @@ -342,7 +344,7 @@ impl Future for RecvFuture<'_> { let (notify, state, mut waiter) = self.project(); - 'outer: loop { + loop { match *state { Init => { // Optimistically try acquiring a pending notification @@ -353,7 +355,7 @@ impl Future for RecvFuture<'_> { if res.is_ok() { // Acquired the notification *state = Done; - continue 'outer; + return Poll::Ready(()); } // Acquire the lock and attempt to transition to the waiting @@ -373,11 +375,13 @@ impl Future for RecvFuture<'_> { .compare_exchange(EMPTY, WAITING, SeqCst, SeqCst); if let Err(actual) = res { + assert_eq!(actual, NOTIFIED); curr = actual; - continue; + } else { + break; } } - WAITING => {} + WAITING => break, NOTIFIED => { // Try consuming the notification let res = notify @@ -388,19 +392,16 @@ impl Future for RecvFuture<'_> { Ok(_) => { // Acquired the notification *state = Done; - continue 'outer; + return Poll::Ready(()); } Err(actual) => { + assert_eq!(actual, EMPTY); curr = actual; - continue; } } } _ => unreachable!(), } - - // The state has been transitioned to waiting - break; } // Safety: called while locked. @@ -414,34 +415,38 @@ impl Future for RecvFuture<'_> { *state = Waiting; } Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + let mut waiters = notify.waiters.lock().unwrap(); - { - // Safety: called while locked - let w = unsafe { &mut *waiter.as_mut().get() }; - - if w.notified { - w.waker = None; - w.notified = false; - - // Remove the entry from the linked list. The entry - // may only be in the linked list while the state is - // `Waiting`. - // - // Safety: called while the lock is held - unsafe { - waiters.remove(waiter.as_mut()); - } + // Safety: called while locked + let w = unsafe { &mut *waiter.as_mut().get() }; - *state = Done; - } else { - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); - } + if w.notified { + // Our waker has been notified. Reset the fields and + // remove it from the list. + w.waker = None; + w.notified = false; - return Poll::Pending; + *state = Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); } + + return Poll::Pending; } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); } Done => { return Poll::Ready(()); diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index b2486069317..2e4d8d2d126 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -2,7 +2,7 @@ //! //! The data structure supports tracking pinned nodes. Most of the data //! structure's APIs are `unsafe` as they require the caller to ensure the -//! specified node is actually +//! specified node is actually contained by the list. use core::cell::UnsafeCell; use core::marker::PhantomPinned; @@ -78,13 +78,8 @@ impl<T> LinkedList<T> { /// self`. pub(crate) fn pop_back(&mut self) -> Option<Pin<&mut T>> { unsafe { - let mut last = match self.tail { - Some(tail) => { - self.tail = tail.as_ref().prev; - tail - } - None => return None, - }; + let mut last = self.tail?; + self.tail = last.as_ref().prev; if let Some(mut prev) = last.as_mut().prev { prev.as_mut().next = None; From d569f52b9945b7c00f5f0cc8d5e5a9898c4e2750 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Tue, 25 Feb 2020 22:07:40 -0800 Subject: [PATCH 04/12] add some comments --- tokio/src/sync/notify.rs | 20 ++++++++++++++++++++ tokio/src/sync/tests/loom_notify.rs | 12 +++++++----- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 4cc1bad8373..8a934e0c024 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -470,6 +470,20 @@ impl Drop for RecvFuture<'_> { let mut notify_state = WAITING; let mut waiters = notify.waiters.lock().unwrap(); + // `Notify.state` may be in any of the three states (Empty, Waiting, + // Notified). It doesn't actually matter what the atomic is set to + // at this point. We hold the lock and will ensure the atomic is in + // the correct state once th elock is dropped. + // + // Because the atomic state is not checked, at first glance, it may + // seem like this routine does not handle the case where the + // receiver is notified but has not yet observed the notification. + // If this happens, no matter how many notifications happen between + // this receiver being notified and the receive future dropping, all + // we need to do is ensure that one notification is returned back to + // the `Notify`. This is done by calling `notify_locked` if `self` + // has the `notified` flag set. + // remove the entry from the list // // safety: the waiter is only added to `waiters` by virtue of it @@ -478,6 +492,12 @@ impl Drop for RecvFuture<'_> { if waiters.is_empty() { notify_state = EMPTY; + // If the state *should* be `NOTIFIED`, the call to + // `notify_locked` below will end up doing the + // `store(NOTIFIED)`. If a concurrent receiver races and + // observes the incorrect `EMPTY` state, it will then obtain the + // lock and block until `notify.state` is in the correct final + // state. notify.state.store(EMPTY, SeqCst); } diff --git a/tokio/src/sync/tests/loom_notify.rs b/tokio/src/sync/tests/loom_notify.rs index 23fc2f35aa7..129acba764f 100644 --- a/tokio/src/sync/tests/loom_notify.rs +++ b/tokio/src/sync/tests/loom_notify.rs @@ -63,13 +63,12 @@ fn notify_drop() { let rx2 = notify.clone(); let th1 = thread::spawn(move || { - let mut recv = Box::pin(async { - rx1.recv().await; - rx1.notify_one(); - }); + let mut recv = Box::pin(rx1.recv()); block_on(poll_fn(|cx| { - let _ = recv.as_mut().poll(cx); + if recv.as_mut().poll(cx).is_ready() { + rx1.notify_one(); + } Poll::Ready(()) })); }); @@ -77,6 +76,9 @@ fn notify_drop() { let th2 = thread::spawn(move || { block_on(async { rx2.recv().await; + // Trigger second notification + rx2.notify_one(); + rx2.recv().await; }); }); From bc7c93c2ff7067354e9d6019774456b0421d5b35 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Tue, 25 Feb 2020 22:34:10 -0800 Subject: [PATCH 05/12] fmt --- tokio/src/sync/notify.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 8a934e0c024..7f9a99165f4 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -263,11 +263,7 @@ impl Default for Notify { } } -fn notify_locked( - waiters: &mut LinkedList<Waiter>, - state: &AtomicU8, - curr: u8, -) -> Option<Waker> { +fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) -> Option<Waker> { loop { match curr { EMPTY | NOTIFIED => { From f781e661494d04e14b431a9b9704f8e4654af7b5 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Tue, 25 Feb 2020 22:35:15 -0800 Subject: [PATCH 06/12] rm unused mut --- tokio/src/sync/notify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 7f9a99165f4..6d54b0179a5 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -416,7 +416,7 @@ impl Future for RecvFuture<'_> { // `notify.waiters`). In order to access the waker fields, // we must hold the lock. - let mut waiters = notify.waiters.lock().unwrap(); + let waiters = notify.waiters.lock().unwrap(); // Safety: called while locked let w = unsafe { &mut *waiter.as_mut().get() }; From f1796f85bed827c9866e02ad3b257e45ce883697 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 26 Feb 2020 10:17:37 -0800 Subject: [PATCH 07/12] rename `recv()` -> `notified()` --- tokio/src/sync/notify.rs | 106 +++++++++++++++++++------------------ tokio/tests/sync_notify.rs | 98 +++++++++++++++++----------------- 2 files changed, 103 insertions(+), 101 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 6d54b0179a5..08ba4032bab 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -14,23 +14,22 @@ use std::task::{Context, Poll, Waker}; /// another task to perform an operation. /// /// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. -/// [`Notify::recv`] waits for a permit to become available and -/// [`Notify::notify_one`] sets a permit **if there currently are no available -/// permits**. +/// [`notfied().await`] waits for a permit to become available and [`notify()`] +/// sets a permit **if there currently are no available permits**. /// /// The synchronization details of `Notify` are similar to /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] -/// value contains a single permit. [`Notify::recv`] waits for the permit to be -/// made available, consumes the permit, and resumes. [`Notify::notify_one`] +/// value contains a single permit. [`Notify::notfied`] waits for the permit to +/// be made available, consumes the permit, and resumes. [`Notify::notify_one`] /// sets the permit, waking a pending task if there is one. /// -/// If `notify_one` is called **before** `recv()`, then the next call to -/// `recv()` will complete immediately, consuming the permit. Any subsequent -/// calls to `recv()` will wait for a new permit. +/// If `notify()` is called **before** `notfied().await`, then the next call to +/// `notified().await` will complete immediately, consuming the permit. Any +/// subsequent calls to `notified().await` will wait for a new permit. /// -/// If `notify_one` is called **multiple** times before `recv()`, only a -/// **single** permit is stored. The next call to `recv()` will complete -/// immediately, but the one after will wait for a new permit. +/// If `notify()` is called **multiple** times before `notified().await`, only a +/// **single** permit is stored. The next call to `notified().await` will +/// complete immediately, but the one after will wait for a new permit. /// /// # Examples /// @@ -46,12 +45,12 @@ use std::task::{Context, Poll, Waker}; /// let notify2 = notify.clone(); /// /// tokio::spawn(async move { -/// notify2.recv().await; +/// notify2.notified().await; /// println!("received notification"); /// }); /// /// println!("sending notification"); -/// notify.notify_one(); +/// notify.notify(); /// } /// ``` /// @@ -59,6 +58,7 @@ use std::task::{Context, Poll, Waker}; /// /// ``` /// use tokio::sync::Notify; +/// /// use std::collections::VecDeque; /// use std::sync::Mutex; /// @@ -73,7 +73,7 @@ use std::task::{Context, Poll, Waker}; /// .push_back(value); /// /// // Notify the consumer a value is available -/// self.notify.notify_one(); +/// self.notify.notify(); /// } /// /// pub async fn recv(&self) -> T { @@ -84,7 +84,7 @@ use std::task::{Context, Poll, Waker}; /// } /// /// // Wait for values to be available -/// self.notify.recv().await; +/// self.notify.notified().await; /// } /// } /// } @@ -92,8 +92,8 @@ use std::task::{Context, Poll, Waker}; /// /// [park]: std::thread::park /// [unpark]: std::thread::Thread::unpark -/// [`Notify::recv`]: Notify::recv() -/// [`Notify::notify_one`]: Notify::notify_one() +/// [`notified().await`]: Notify::notified() +/// [`notify`]: Notify::notify() /// [`Semaphore`]: crate::sync::Semaphore #[derive(Debug)] pub struct Notify { @@ -110,20 +110,21 @@ struct Waiter { notified: bool, } +/// Future returned from `notified()` #[derive(Debug)] -struct RecvFuture<'a> { +struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, /// The current state of the receiving process. - state: RecvState, + state: State, /// Entry in the waiter `LinkedList`. waiter: linked_list::Entry<Waiter>, } #[derive(Debug)] -enum RecvState { +enum State { Init, Waiting, Done, @@ -158,9 +159,11 @@ impl Notify { /// Wait for a notification. /// /// Each `Notify` value holds a single permit. If a permit is available from - /// an earlier call to `notify_one`, then `recv` will complete immediately, - /// consuming that permit. Otherwise, `recv()` waits for a permit to be made - /// available by the next call to `notify_one` + /// an earlier call to [`notify()`], then `notified().await` will complete + /// immediately, consuming that permit. Otherwise, `notified().await` waits + /// for a permit to be made available by the next call to `notify()` + /// + /// [`notify()`]: Notify::notify /// /// # Examples /// @@ -174,17 +177,18 @@ impl Notify { /// let notify2 = notify.clone(); /// /// tokio::spawn(async move { - /// notify2.recv().await; - /// println!("received a notification"); + /// notify2.notified().await; + /// println!("received notification"); /// }); /// - /// notify.notify_one(); + /// println!("sending notification"); + /// notify.notify(); /// } /// ``` - pub async fn recv(&self) { - RecvFuture { + pub async fn notified(&self) { + Notified { notify: self, - state: RecvState::Init, + state: State::Init, waiter: linked_list::Entry::new(Waiter { waker: None, notified: false, @@ -197,12 +201,13 @@ impl Notify { /// /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to - /// `recv()` will complete immediately consuming the permit made available - /// by this call to `notify_one()`. + /// `notified().await` will complete immediately consuming the permit made + /// available by this call to `notify()`. /// /// At most one permit may be stored by `Notify`. Many sequential calls to - /// `notify_one` will result in a single permit being stored. The next call - /// to `recv()` will complete immediately, but the one after that will wait. + /// `notify` will result in a single permit being stored. The next call to + /// `notified().await` will complete immediately, but the one after that + /// will wait. /// /// # Examples /// @@ -216,14 +221,15 @@ impl Notify { /// let notify2 = notify.clone(); /// /// tokio::spawn(async move { - /// notify2.recv().await; - /// println!("received a notification"); + /// notify2.notified().await; + /// println!("received notification"); /// }); /// - /// notify.notify_one(); + /// println!("sending notification"); + /// notify.notify(); /// } /// ``` - pub fn notify_one(&self) { + pub fn notify(&self) { // Load the current state let mut curr = self.state.load(SeqCst); @@ -231,7 +237,7 @@ impl Notify { while let EMPTY | NOTIFIED = curr { // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A // happens-before synchronization must happen between this atomic - // operation and a task calling `recv()`. + // operation and a task calling `notified().await`. let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst); match res { @@ -306,16 +312,12 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) - } } -// ===== impl RecvFuture ===== +// ===== impl Notified ===== -impl RecvFuture<'_> { +impl Notified<'_> { fn project( self: Pin<&mut Self>, - ) -> ( - &Notify, - &mut RecvState, - Pin<&mut linked_list::Entry<Waiter>>, - ) { + ) -> (&Notify, &mut State, Pin<&mut linked_list::Entry<Waiter>>) { unsafe { // Safety: both `notify` and `state` are `Unpin`. @@ -332,11 +334,11 @@ impl RecvFuture<'_> { } } -impl Future for RecvFuture<'_> { +impl Future for Notified<'_> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - use RecvState::*; + use State::*; let (notify, state, mut waiter) = self.project(); @@ -452,16 +454,16 @@ impl Future for RecvFuture<'_> { } } -impl Drop for RecvFuture<'_> { +impl Drop for Notified<'_> { fn drop(&mut self) { - use RecvState::*; + use State::*; // Safety: The type only transitions to a "Waiting" state when pinned. let (notify, state, mut waiter) = unsafe { Pin::new_unchecked(self).project() }; - // This is where we ensure safety. The `RecvFuture` is being dropped, - // which means we must ensure that the waiter entry is no longer stored - // in the linked list. + // This is where we ensure safety. The `Notified` value is being + // dropped, which means we must ensure that the waiter entry is no + // longer stored in the linked list. if let Waiting = *state { let mut notify_state = WAITING; let mut waiters = notify.waiters.lock().unwrap(); diff --git a/tokio/tests/sync_notify.rs b/tokio/tests/sync_notify.rs index 6f0e4cfdb9e..be39ce32dfd 100644 --- a/tokio/tests/sync_notify.rs +++ b/tokio/tests/sync_notify.rs @@ -9,94 +9,94 @@ trait AssertSend: Send + Sync {} impl AssertSend for Notify {} #[test] -fn notify_recv_one() { +fn notify_notified_one() { let notify = Notify::new(); - let mut recv = spawn(async { notify.recv().await }); + let mut notified = spawn(async { notify.notified().await }); - notify.notify_one(); - assert_ready!(recv.poll()); + notify.notify(); + assert_ready!(notified.poll()); } #[test] -fn recv_one_notify() { +fn notified_one_notify() { let notify = Notify::new(); - let mut recv = spawn(async { notify.recv().await }); + let mut notified = spawn(async { notify.notified().await }); - assert_pending!(recv.poll()); + assert_pending!(notified.poll()); - notify.notify_one(); - assert!(recv.is_woken()); - assert_ready!(recv.poll()); + notify.notify(); + assert!(notified.is_woken()); + assert_ready!(notified.poll()); } #[test] -fn recv_multi_notify() { +fn notified_multi_notify() { let notify = Notify::new(); - let mut recv1 = spawn(async { notify.recv().await }); - let mut recv2 = spawn(async { notify.recv().await }); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); - assert_pending!(recv1.poll()); - assert_pending!(recv2.poll()); + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); - notify.notify_one(); - assert!(recv1.is_woken()); - assert!(!recv2.is_woken()); + notify.notify(); + assert!(notified1.is_woken()); + assert!(!notified2.is_woken()); - assert_ready!(recv1.poll()); - assert_pending!(recv2.poll()); + assert_ready!(notified1.poll()); + assert_pending!(notified2.poll()); } #[test] -fn notify_recv_multi() { +fn notify_notified_multi() { let notify = Notify::new(); - notify.notify_one(); + notify.notify(); - let mut recv1 = spawn(async { notify.recv().await }); - let mut recv2 = spawn(async { notify.recv().await }); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); - assert_ready!(recv1.poll()); - assert_pending!(recv2.poll()); + assert_ready!(notified1.poll()); + assert_pending!(notified2.poll()); - notify.notify_one(); + notify.notify(); - assert!(recv2.is_woken()); - assert_ready!(recv2.poll()); + assert!(notified2.is_woken()); + assert_ready!(notified2.poll()); } #[test] -fn recv_drop_recv_notify() { +fn notified_drop_notified_notify() { let notify = Notify::new(); - let mut recv1 = spawn(async { notify.recv().await }); - let mut recv2 = spawn(async { notify.recv().await }); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); - assert_pending!(recv1.poll()); + assert_pending!(notified1.poll()); - drop(recv1); + drop(notified1); - assert_pending!(recv2.poll()); + assert_pending!(notified2.poll()); - notify.notify_one(); - assert!(recv2.is_woken()); - assert_ready!(recv2.poll()); + notify.notify(); + assert!(notified2.is_woken()); + assert_ready!(notified2.poll()); } #[test] -fn recv_multi_notify_drop_one() { +fn notified_multi_notify_drop_one() { let notify = Notify::new(); - let mut recv1 = spawn(async { notify.recv().await }); - let mut recv2 = spawn(async { notify.recv().await }); + let mut notified1 = spawn(async { notify.notified().await }); + let mut notified2 = spawn(async { notify.notified().await }); - assert_pending!(recv1.poll()); - assert_pending!(recv2.poll()); + assert_pending!(notified1.poll()); + assert_pending!(notified2.poll()); - notify.notify_one(); + notify.notify(); - assert!(recv1.is_woken()); - assert!(!recv2.is_woken()); + assert!(notified1.is_woken()); + assert!(!notified2.is_woken()); - drop(recv1); + drop(notified1); - assert!(recv2.is_woken()); - assert_ready!(recv2.poll()); + assert!(notified2.is_woken()); + assert_ready!(notified2.poll()); } From 9ba90ebbd2e92801e4559c3354043412a11885f8 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 26 Feb 2020 10:30:43 -0800 Subject: [PATCH 08/12] more rename --- tokio/src/sync/notify.rs | 8 ++++---- tokio/src/sync/tests/loom_notify.rs | 24 ++++++++++++------------ 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 08ba4032bab..21de39b3650 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -19,9 +19,9 @@ use std::task::{Context, Poll, Waker}; /// /// The synchronization details of `Notify` are similar to /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] -/// value contains a single permit. [`Notify::notfied`] waits for the permit to -/// be made available, consumes the permit, and resumes. [`Notify::notify_one`] -/// sets the permit, waking a pending task if there is one. +/// value contains a single permit. [`notfied().await`] waits for the permit to +/// be made available, consumes the permit, and resumes. [`notify()`] sets the +/// permit, waking a pending task if there is one. /// /// If `notify()` is called **before** `notfied().await`, then the next call to /// `notified().await` will complete immediately, consuming the permit. Any @@ -93,7 +93,7 @@ use std::task::{Context, Poll, Waker}; /// [park]: std::thread::park /// [unpark]: std::thread::Thread::unpark /// [`notified().await`]: Notify::notified() -/// [`notify`]: Notify::notify() +/// [`notify()`]: Notify::notify() /// [`Semaphore`]: crate::sync::Semaphore #[derive(Debug)] pub struct Notify { diff --git a/tokio/src/sync/tests/loom_notify.rs b/tokio/src/sync/tests/loom_notify.rs index 129acba764f..60981d4669a 100644 --- a/tokio/src/sync/tests/loom_notify.rs +++ b/tokio/src/sync/tests/loom_notify.rs @@ -12,11 +12,11 @@ fn notify_one() { let th = thread::spawn(move || { block_on(async { - rx.recv().await; + rx.notified().await; }); }); - tx.notify_one(); + tx.notify(); th.join().unwrap(); }); } @@ -33,20 +33,20 @@ fn notify_multi() { ths.push(thread::spawn(move || { block_on(async { - notify.recv().await; - notify.notify_one(); + notify.notified().await; + notify.notify(); }) })); } - notify.notify_one(); + notify.notify(); for th in ths.drain(..) { th.join().unwrap(); } block_on(async { - notify.recv().await; + notify.notified().await; }); }); } @@ -63,11 +63,11 @@ fn notify_drop() { let rx2 = notify.clone(); let th1 = thread::spawn(move || { - let mut recv = Box::pin(rx1.recv()); + let mut recv = Box::pin(rx1.notified()); block_on(poll_fn(|cx| { if recv.as_mut().poll(cx).is_ready() { - rx1.notify_one(); + rx1.notify(); } Poll::Ready(()) })); @@ -75,14 +75,14 @@ fn notify_drop() { let th2 = thread::spawn(move || { block_on(async { - rx2.recv().await; + rx2.notified().await; // Trigger second notification - rx2.notify_one(); - rx2.recv().await; + rx2.notify(); + rx2.notified().await; }); }); - notify.notify_one(); + notify.notify(); th1.join().unwrap(); th2.join().unwrap(); From c1a3a1f457ad7df17201fb43b343c99f9a84ee27 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 26 Feb 2020 10:59:01 -0800 Subject: [PATCH 09/12] fix failed merge --- tokio/src/macros/pin.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/macros/pin.rs b/tokio/src/macros/pin.rs index 193dcdfc71d..5b677610a02 100644 --- a/tokio/src/macros/pin.rs +++ b/tokio/src/macros/pin.rs @@ -140,5 +140,5 @@ macro_rules! pin { let $x = $init; crate::pin!($x); )* - } + }; } From 1eac395972716fba169af115ec273db540520011 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 26 Feb 2020 10:59:21 -0800 Subject: [PATCH 10/12] Update tokio/src/sync/notify.rs Co-Authored-By: Eliza Weisman <eliza@buoyant.io> --- tokio/src/sync/notify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 21de39b3650..756ef870eef 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -14,7 +14,7 @@ use std::task::{Context, Poll, Waker}; /// another task to perform an operation. /// /// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. -/// [`notfied().await`] waits for a permit to become available and [`notify()`] +/// [`notified().await`] waits for a permit to become available, and [`notify()`] /// sets a permit **if there currently are no available permits**. /// /// The synchronization details of `Notify` are similar to From b470800fdf7bdf45799c3271906f37e62a97afeb Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 26 Feb 2020 11:00:42 -0800 Subject: [PATCH 11/12] Update tokio/src/sync/notify.rs Co-Authored-By: Eliza Weisman <eliza@buoyant.io> --- tokio/src/sync/notify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 756ef870eef..2d274e996cd 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -161,7 +161,7 @@ impl Notify { /// Each `Notify` value holds a single permit. If a permit is available from /// an earlier call to [`notify()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits - /// for a permit to be made available by the next call to `notify()` + /// for a permit to be made available by the next call to `notify()`. /// /// [`notify()`]: Notify::notify /// From 321e09e4b39eee93103e3457ba13cd7c912a3595 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 26 Feb 2020 11:11:41 -0800 Subject: [PATCH 12/12] tweaks --- tokio/src/sync/notify.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 756ef870eef..42173c936f7 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -201,7 +201,7 @@ impl Notify { /// /// If a task is currently waiting, that task is notified. Otherwise, a /// permit is stored in this `Notify` value and the **next** call to - /// `notified().await` will complete immediately consuming the permit made + /// [`notified().await`] will complete immediately consuming the permit made /// available by this call to `notify()`. /// /// At most one permit may be stored by `Notify`. Many sequential calls to @@ -209,6 +209,8 @@ impl Notify { /// `notified().await` will complete immediately, but the one after that /// will wait. /// + /// [`notified().await`]: Notify::notified() + /// /// # Examples /// /// ``` @@ -315,6 +317,8 @@ fn notify_locked(waiters: &mut LinkedList<Waiter>, state: &AtomicU8, curr: u8) - // ===== impl Notified ===== impl Notified<'_> { + /// A custom `project` implementation is used in place of `pin-project-lite` + /// as a custom drop implementation is needed. fn project( self: Pin<&mut Self>, ) -> (&Notify, &mut State, Pin<&mut linked_list::Entry<Waiter>>) {