From 6b71a6b824ef93d3845f5c4ec3bcc8ee3d242f62 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 30 Aug 2022 10:17:09 -0700 Subject: [PATCH] refac(maitake): combine `wait` module into `sync` (#303) currently, `maitake` has a `wait` module, which contains `WaitCell`, `WaitQueue`, `WaitMap`, and (as of #301) `Semaphore`. in addition, it has a separate `sync` module, which currently contains `Mutex` and nothing else (and may grow a `RwLock` eventually/soon). this feels a bit unfortunate --- the `wait` types are also synchronization primitives. this branch moves all of `maitake::wait` into `maitake::sync`. in addition,i fixed up the docs a little bit. closes #302 --- maitake/examples/tokio-console.rs | 2 +- maitake/src/lib.rs | 1 - maitake/src/scheduler/tests.rs | 2 +- maitake/src/sync.rs | 51 +++++++++++++++++-- maitake/src/sync/mutex.rs | 9 ++-- .../{tests/loom_mutex.rs => mutex/loom.rs} | 0 maitake/src/{wait => sync}/semaphore.rs | 18 +++---- maitake/src/{wait => sync}/semaphore/loom.rs | 0 maitake/src/{wait => sync}/semaphore/tests.rs | 2 +- maitake/src/sync/tests.rs | 2 - .../src/{wait/cell.rs => sync/wait_cell.rs} | 0 maitake/src/{wait/map.rs => sync/wait_map.rs} | 6 ++- .../src/{wait/map => sync/wait_map}/tests.rs | 0 .../src/{wait/queue.rs => sync/wait_queue.rs} | 24 ++++++--- .../{wait/queue => sync/wait_queue}/tests.rs | 0 maitake/src/wait.rs | 42 --------------- 16 files changed, 85 insertions(+), 74 deletions(-) rename maitake/src/sync/{tests/loom_mutex.rs => mutex/loom.rs} (100%) rename maitake/src/{wait => sync}/semaphore.rs (98%) rename maitake/src/{wait => sync}/semaphore/loom.rs (100%) rename maitake/src/{wait => sync}/semaphore/tests.rs (98%) delete mode 100644 maitake/src/sync/tests.rs rename maitake/src/{wait/cell.rs => sync/wait_cell.rs} (100%) rename maitake/src/{wait/map.rs => sync/wait_map.rs} (99%) rename maitake/src/{wait/map => sync/wait_map}/tests.rs (100%) rename maitake/src/{wait/queue.rs => sync/wait_queue.rs} (98%) rename maitake/src/{wait/queue => sync/wait_queue}/tests.rs (100%) delete mode 100644 maitake/src/wait.rs diff --git a/maitake/examples/tokio-console.rs b/maitake/examples/tokio-console.rs index adfbeae1..6d41e037 100644 --- a/maitake/examples/tokio-console.rs +++ b/maitake/examples/tokio-console.rs @@ -1,4 +1,4 @@ -use maitake::{scheduler::Scheduler, wait::WaitQueue}; +use maitake::{scheduler::Scheduler, sync::WaitQueue}; use std::{sync::Arc, thread, time::Duration}; fn main() { diff --git a/maitake/src/lib.rs b/maitake/src/lib.rs index ed3233e6..10d1aef8 100644 --- a/maitake/src/lib.rs +++ b/maitake/src/lib.rs @@ -17,4 +17,3 @@ pub mod future; pub mod scheduler; pub mod sync; pub mod task; -pub mod wait; diff --git a/maitake/src/scheduler/tests.rs b/maitake/src/scheduler/tests.rs index e8c5cb3a..45a1d60a 100644 --- a/maitake/src/scheduler/tests.rs +++ b/maitake/src/scheduler/tests.rs @@ -1,6 +1,6 @@ use super::*; use crate::loom::sync::atomic::{AtomicUsize, Ordering}; -use crate::{future, wait::cell::test_util::Chan}; +use crate::{future, sync::wait_cell::test_util::Chan}; #[cfg(all(feature = "alloc", not(loom)))] mod alloc { diff --git a/maitake/src/sync.rs b/maitake/src/sync.rs index 9724d9cf..16823433 100644 --- a/maitake/src/sync.rs +++ b/maitake/src/sync.rs @@ -1,14 +1,55 @@ -//! Synchronization primitives +//! Asynchronous synchronization primitives //! -//! This module contains the following asynchronous synchronization primitives: +//! This module contains the following synchronization primitives: //! -//! - [`Mutex`]: a fairly queued, asynchronous mutual exclusion lock. +//! - [`Mutex`]: a fairly queued, asynchronous [mutual exclusion lock], for +//! protecting shared data +//! - [`Semaphore`]: an asynchronous [counting semaphore], for limiting the +//! number of tasks which may run concurrently +//! - [`WaitCell`], a cell that stores a *single* waiting task's [`Waker`], so +//! that the task can be woken by another task, +//! - [`WaitQueue`], a queue of waiting tasks, which are woken in first-in, +//! first-out order +//! - [`WaitMap`], a set of waiting tasks associated with keys, in which a task +//! can be woken by its key +//! +//! [mutual exclusion lock]:https://en.wikipedia.org/wiki/Mutual_exclusion +//! [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming) +//! [`Waker`]: core::task::Waker pub mod mutex; +pub mod semaphore; +pub(crate) mod wait_cell; +pub mod wait_map; +pub mod wait_queue; + #[cfg(feature = "alloc")] #[doc(inline)] pub use self::mutex::OwnedMutexGuard; #[doc(inline)] pub use self::mutex::{Mutex, MutexGuard}; +#[doc(inline)] +pub use self::semaphore::Semaphore; +pub use self::wait_cell::WaitCell; +#[doc(inline)] +pub use self::wait_map::WaitMap; +#[doc(inline)] +pub use self::wait_queue::WaitQueue; + +use core::task::Poll; + +/// An error indicating that a [`WaitCell`], [`WaitQueue`], [`WaitMap`], or +/// [`Semaphore`] was closed while attempting to register a waiter. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct Closed(()); + +pub type WaitResult = Result; + +pub(in crate::sync) const fn closed() -> Poll> { + Poll::Ready(Err(Closed::new())) +} -#[cfg(test)] -mod tests; +impl Closed { + pub(in crate::sync) const fn new() -> Self { + Self(()) + } +} diff --git a/maitake/src/sync/mutex.rs b/maitake/src/sync/mutex.rs index 96b48e50..44ff2148 100644 --- a/maitake/src/sync/mutex.rs +++ b/maitake/src/sync/mutex.rs @@ -3,7 +3,7 @@ //! See the documentation on the [`Mutex`] type for details. use crate::{ loom::cell::{MutPtr, UnsafeCell}, - wait::queue::{self, WaitQueue}, + sync::wait_queue::{self, WaitQueue}, }; use core::{ future::Future, @@ -14,6 +14,9 @@ use core::{ use mycelium_util::{fmt, unreachable_unchecked}; use pin_project::pin_project; +#[cfg(all(test, loom))] +mod loom; + /// An asynchronous [mutual exclusion lock][mutex] for protecting shared data. /// /// The data can only be accessed through the [RAII guards] returned @@ -73,7 +76,7 @@ use pin_project::pin_project; /// [`mycelium_util::sync::spin::Mutex`]: https://mycelium.elizas.website/mycelium_util/sync/spin/struct.mutex /// [`futures-util`]: https://crates.io/crate/futures-util /// [`futures_util::lock::Mutex`]: https://docs.rs/futures-util/latest/futures_util/lock/struct.Mutex.html -/// [intrusive linked list]: crate::wait::WaitQueue#implementation-notes +/// [intrusive linked list]: crate::sync::WaitQueue#implementation-notes /// [poisoning]: https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html#poisoning // for some reason, intra-doc links don't work in footnotes? /// [storage]: ../task/trait.Storage.html @@ -115,7 +118,7 @@ pub struct MutexGuard<'a, T: ?Sized> { #[pin_project] pub struct Lock<'a, T: ?Sized> { #[pin] - wait: queue::Wait<'a>, + wait: wait_queue::Wait<'a>, mutex: &'a Mutex, } diff --git a/maitake/src/sync/tests/loom_mutex.rs b/maitake/src/sync/mutex/loom.rs similarity index 100% rename from maitake/src/sync/tests/loom_mutex.rs rename to maitake/src/sync/mutex/loom.rs diff --git a/maitake/src/wait/semaphore.rs b/maitake/src/sync/semaphore.rs similarity index 98% rename from maitake/src/wait/semaphore.rs rename to maitake/src/sync/semaphore.rs index 68d35d06..80045cba 100644 --- a/maitake/src/wait/semaphore.rs +++ b/maitake/src/sync/semaphore.rs @@ -12,7 +12,7 @@ use crate::{ spin::{Mutex, MutexGuard}, }, }, - wait::{self, WaitResult}, + sync::{self, WaitResult}, }; use cordyceps::{ list::{self, List}, @@ -67,7 +67,7 @@ mod tests; /// /// ``` /// # use std as alloc; -/// use maitake::{scheduler::Scheduler, wait::Semaphore}; +/// use maitake::{scheduler::Scheduler, sync::Semaphore}; /// use alloc::sync::Arc; /// /// let scheduler = Scheduler::new(); @@ -105,7 +105,7 @@ mod tests; /// /// ``` /// # use std as alloc; -/// use maitake::{scheduler::Scheduler, wait::Semaphore}; +/// use maitake::{scheduler::Scheduler, sync::Semaphore}; /// use alloc::sync::Arc; /// /// // How many tasks will we be waiting for the completion of? @@ -325,7 +325,7 @@ impl Semaphore { /// were requested. If an [`Acquire`] future is dropped before it completes, /// the task will lose its place in the queue. /// - /// [`Closed`]: crate::wait::Closed + /// [`Closed`]: crate::sync::Closed /// [closed]: Semaphore::close pub fn acquire(&self, permits: usize) -> Acquire<'_> { Acquire { @@ -378,7 +378,7 @@ impl Semaphore { /// - `Err(`[`TryAcquireError::InsufficientPermits`]`)` if the semaphore had /// fewer than `permits` permits available. /// - /// [`Closed`]: crate::wait::Closed + /// [`Closed`]: crate::sync::Closed /// [closed]: Semaphore::close pub fn try_acquire(&self, permits: usize) -> Result, TryAcquireError> { trace!(permits, "Semaphore::try_acquire"); @@ -434,7 +434,7 @@ impl Semaphore { let mut waiters = loop { // semaphore has closed if sem_curr == Self::CLOSED { - return wait::closed(); + return sync::closed(); } // the total number of permits currently available to this waiter @@ -522,7 +522,7 @@ impl Semaphore { queued, "Semaphore::poll_acquire -> semaphore closed" ); - return wait::closed(); + return sync::closed(); } // add permits to the waiter, returning whether we added enough to wake @@ -835,7 +835,7 @@ feature! { /// completes, the task will lose its place in the queue. /// /// [`acquire`]: Semaphore::acquire - /// [`Closed`]: crate::wait::Closed + /// [`Closed`]: crate::sync::Closed /// [closed]: Semaphore::close pub fn acquire_owned(self: &Arc, permits: usize) -> AcquireOwned { AcquireOwned { @@ -865,7 +865,7 @@ feature! { /// /// /// [`try_acquire`]: Semaphore::try_acquire - /// [`Closed`]: crate::wait::Closed + /// [`Closed`]: crate::sync::Closed /// [closed]: Semaphore::close pub fn try_acquire_owned(self: &Arc, permits: usize) -> Result { trace!(permits, "Semaphore::try_acquire_owned"); diff --git a/maitake/src/wait/semaphore/loom.rs b/maitake/src/sync/semaphore/loom.rs similarity index 100% rename from maitake/src/wait/semaphore/loom.rs rename to maitake/src/sync/semaphore/loom.rs diff --git a/maitake/src/wait/semaphore/tests.rs b/maitake/src/sync/semaphore/tests.rs similarity index 98% rename from maitake/src/wait/semaphore/tests.rs rename to maitake/src/sync/semaphore/tests.rs index ee9663f8..654a09f2 100644 --- a/maitake/src/wait/semaphore/tests.rs +++ b/maitake/src/sync/semaphore/tests.rs @@ -14,7 +14,7 @@ fn permit_is_send_and_sync() { #[test] fn acquire_is_send_and_sync() { - assert_send_sync::>(); + assert_send_sync::>(); } #[cfg(feature = "alloc")] diff --git a/maitake/src/sync/tests.rs b/maitake/src/sync/tests.rs deleted file mode 100644 index 2126e5c1..00000000 --- a/maitake/src/sync/tests.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[cfg(loom)] -mod loom_mutex; diff --git a/maitake/src/wait/cell.rs b/maitake/src/sync/wait_cell.rs similarity index 100% rename from maitake/src/wait/cell.rs rename to maitake/src/sync/wait_cell.rs diff --git a/maitake/src/wait/map.rs b/maitake/src/sync/wait_map.rs similarity index 99% rename from maitake/src/wait/map.rs rename to maitake/src/sync/wait_map.rs index 87bb9939..c65756a3 100644 --- a/maitake/src/wait/map.rs +++ b/maitake/src/sync/wait_map.rs @@ -1,3 +1,7 @@ +//! A map of [`Waker`]s associated with keys, so that a task can be woken by +//! key. +//! +//! See the documentation for the [`WaitMap`] type for details. use crate::loom::{ cell::UnsafeCell, sync::{ @@ -363,7 +367,7 @@ enum State { /// *Note*: This *must* correspond to all state bits being set, as it's set /// via a [`fetch_or`]. /// - /// [`Closed`]: crate::wait::Closed + /// [`Closed`]: crate::sync::Closed /// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or Closed = 0b11, } diff --git a/maitake/src/wait/map/tests.rs b/maitake/src/sync/wait_map/tests.rs similarity index 100% rename from maitake/src/wait/map/tests.rs rename to maitake/src/sync/wait_map/tests.rs diff --git a/maitake/src/wait/queue.rs b/maitake/src/sync/wait_queue.rs similarity index 98% rename from maitake/src/wait/queue.rs rename to maitake/src/sync/wait_queue.rs index de5a4d5b..5df9ced5 100644 --- a/maitake/src/wait/queue.rs +++ b/maitake/src/sync/wait_queue.rs @@ -1,3 +1,7 @@ +//! A queue of waiting tasks that can be woken in first-in, first-out order (or +//! all at once). +//! +//! See the [`WaitQueue`] type's documentation for details. use crate::{ loom::{ cell::UnsafeCell, @@ -6,7 +10,7 @@ use crate::{ spin::Mutex, }, }, - wait::{self, WaitResult}, + sync::{self, WaitResult}, }; use cordyceps::{ list::{self, List}, @@ -50,7 +54,7 @@ mod tests; /// /// ``` /// use std::sync::Arc; -/// use maitake::{scheduler::Scheduler, wait::WaitQueue}; +/// use maitake::{scheduler::Scheduler, sync::WaitQueue}; /// /// const TASKS: usize = 10; /// @@ -96,7 +100,7 @@ mod tests; /// /// ``` /// use std::sync::Arc; -/// use maitake::{scheduler::Scheduler, wait::WaitQueue}; +/// use maitake::{scheduler::Scheduler, sync::WaitQueue}; /// /// const TASKS: usize = 10; /// @@ -314,7 +318,7 @@ enum State { /// *Note*: This *must* correspond to all state bits being set, as it's set /// via a [`fetch_or`]. /// - /// [`Closed`]: crate::wait::Closed + /// [`Closed`]: crate::sync::Closed /// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or Closed = 0b11, } @@ -343,6 +347,8 @@ impl WaitQueue { /// Returns a new `WaitQueue` with a single stored wakeup. /// /// The first call to [`wait`] on this queue will immediately succeed. + /// + /// [`wait`]: Self::wait // TODO(eliza): should this be a public API? #[must_use] pub(crate) fn new_woken() -> Self { @@ -365,7 +371,7 @@ impl WaitQueue { /// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the /// next call to [`wait`] will complete immediately. /// - /// [`wait`]: WaitQueue::wait + /// [`wait`]: Self::wait #[inline] pub fn wake(&self) { // snapshot the queue's current state. @@ -451,6 +457,8 @@ impl WaitQueue { /// This method is generally used when implementing higher-level /// synchronization primitives or resources: when an event makes a resource /// permanently unavailable, the queue can be closed. + /// + /// [`wait`]: Self::wait pub fn close(&self) { let state = self.state.fetch_or(State::Closed.into_usize(), SeqCst); let state = test_dbg!(QueueState::from_bits(state)); @@ -494,7 +502,7 @@ impl WaitQueue { } match state.get(QueueState::STATE) { - State::Closed => wait::closed(), + State::Closed => sync::closed(), _ if state.get(QueueState::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())), State::Empty | State::Waiting => Poll::Pending, State::Woken => Poll::Ready(Ok(())), @@ -698,7 +706,7 @@ impl Waiter { Err(actual) => queue_state = actual, } } - State::Closed => return wait::closed(), + State::Closed => return sync::closed(), } } @@ -735,7 +743,7 @@ impl Waiter { } Wakeup::Closed => { this.state.set(WaitStateBits::STATE, WaitState::Woken); - wait::closed() + sync::closed() } Wakeup::Empty => unreachable!(), } diff --git a/maitake/src/wait/queue/tests.rs b/maitake/src/sync/wait_queue/tests.rs similarity index 100% rename from maitake/src/wait/queue/tests.rs rename to maitake/src/sync/wait_queue/tests.rs diff --git a/maitake/src/wait.rs b/maitake/src/wait.rs deleted file mode 100644 index b2dfad7e..00000000 --- a/maitake/src/wait.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! Waiter cells and queues to allow tasks to wait for notifications. -//! -//! This module implements the following primitives for waiting: -//! -//! - [`WaitCell`], which stores a *single* waiting task -//! - [`WaitQueue`], a queue of waiting tasks, which are woken in first-in, -//! first-out order -//! - [`WaitMap`], a set of waiting tasks associated with keys, in which a task -//! can be woken by its key -//! - [`Semaphore`]: an asynchronous [counting semaphore], for limiting the -//! number of tasks which may run concurrently -pub(crate) mod cell; -pub mod map; -pub mod queue; -pub mod semaphore; - -pub use self::cell::WaitCell; -#[doc(inline)] -pub use self::map::WaitMap; -#[doc(inline)] -pub use self::queue::WaitQueue; -#[doc(inline)] -pub use self::semaphore::Semaphore; - -use core::task::Poll; - -/// An error indicating that a [`WaitCell`] or [`WaitQueue`] was closed while -/// attempting register a waiter. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub struct Closed(()); - -pub type WaitResult = Result; - -pub(in crate::wait) const fn closed() -> Poll> { - Poll::Ready(Err(Closed::new())) -} - -impl Closed { - pub(in crate::wait) const fn new() -> Self { - Self(()) - } -}