Skip to content

Commit

Permalink
refac(maitake): combine wait module into sync (#303)
Browse files Browse the repository at this point in the history
currently, `maitake` has a `wait` module, which contains `WaitCell`,
`WaitQueue`, `WaitMap`, and (as of #301) `Semaphore`. in addition, it
has a separate `sync` module, which currently contains `Mutex` and
nothing else (and may grow a `RwLock` eventually/soon). this feels a bit
unfortunate --- the `wait` types are also synchronization primitives.

this branch moves all of `maitake::wait` into `maitake::sync`. in
addition,i fixed up the docs a little bit.

closes #302
  • Loading branch information
hawkw authored Aug 30, 2022
1 parent 8a46328 commit 6b71a6b
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 74 deletions.
2 changes: 1 addition & 1 deletion maitake/examples/tokio-console.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use maitake::{scheduler::Scheduler, wait::WaitQueue};
use maitake::{scheduler::Scheduler, sync::WaitQueue};
use std::{sync::Arc, thread, time::Duration};

fn main() {
Expand Down
1 change: 0 additions & 1 deletion maitake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ pub mod future;
pub mod scheduler;
pub mod sync;
pub mod task;
pub mod wait;
2 changes: 1 addition & 1 deletion maitake/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use crate::loom::sync::atomic::{AtomicUsize, Ordering};
use crate::{future, wait::cell::test_util::Chan};
use crate::{future, sync::wait_cell::test_util::Chan};

#[cfg(all(feature = "alloc", not(loom)))]
mod alloc {
Expand Down
51 changes: 46 additions & 5 deletions maitake/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,55 @@
//! Synchronization primitives
//! Asynchronous synchronization primitives
//!
//! This module contains the following asynchronous synchronization primitives:
//! This module contains the following synchronization primitives:
//!
//! - [`Mutex`]: a fairly queued, asynchronous mutual exclusion lock.
//! - [`Mutex`]: a fairly queued, asynchronous [mutual exclusion lock], for
//! protecting shared data
//! - [`Semaphore`]: an asynchronous [counting semaphore], for limiting the
//! number of tasks which may run concurrently
//! - [`WaitCell`], a cell that stores a *single* waiting task's [`Waker`], so
//! that the task can be woken by another task,
//! - [`WaitQueue`], a queue of waiting tasks, which are woken in first-in,
//! first-out order
//! - [`WaitMap`], a set of waiting tasks associated with keys, in which a task
//! can be woken by its key
//!
//! [mutual exclusion lock]:https://en.wikipedia.org/wiki/Mutual_exclusion
//! [counting semaphore]: https://en.wikipedia.org/wiki/Semaphore_(programming)
//! [`Waker`]: core::task::Waker
pub mod mutex;
pub mod semaphore;
pub(crate) mod wait_cell;
pub mod wait_map;
pub mod wait_queue;

#[cfg(feature = "alloc")]
#[doc(inline)]
pub use self::mutex::OwnedMutexGuard;
#[doc(inline)]
pub use self::mutex::{Mutex, MutexGuard};
#[doc(inline)]
pub use self::semaphore::Semaphore;
pub use self::wait_cell::WaitCell;
#[doc(inline)]
pub use self::wait_map::WaitMap;
#[doc(inline)]
pub use self::wait_queue::WaitQueue;

use core::task::Poll;

/// An error indicating that a [`WaitCell`], [`WaitQueue`], [`WaitMap`], or
/// [`Semaphore`] was closed while attempting to register a waiter.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Closed(());

pub type WaitResult<T> = Result<T, Closed>;

pub(in crate::sync) const fn closed<T>() -> Poll<WaitResult<T>> {
Poll::Ready(Err(Closed::new()))
}

#[cfg(test)]
mod tests;
impl Closed {
pub(in crate::sync) const fn new() -> Self {
Self(())
}
}
9 changes: 6 additions & 3 deletions maitake/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! See the documentation on the [`Mutex`] type for details.
use crate::{
loom::cell::{MutPtr, UnsafeCell},
wait::queue::{self, WaitQueue},
sync::wait_queue::{self, WaitQueue},
};
use core::{
future::Future,
Expand All @@ -14,6 +14,9 @@ use core::{
use mycelium_util::{fmt, unreachable_unchecked};
use pin_project::pin_project;

#[cfg(all(test, loom))]
mod loom;

/// An asynchronous [mutual exclusion lock][mutex] for protecting shared data.
///
/// The data can only be accessed through the [RAII guards] returned
Expand Down Expand Up @@ -73,7 +76,7 @@ use pin_project::pin_project;
/// [`mycelium_util::sync::spin::Mutex`]: https://mycelium.elizas.website/mycelium_util/sync/spin/struct.mutex
/// [`futures-util`]: https://crates.io/crate/futures-util
/// [`futures_util::lock::Mutex`]: https://docs.rs/futures-util/latest/futures_util/lock/struct.Mutex.html
/// [intrusive linked list]: crate::wait::WaitQueue#implementation-notes
/// [intrusive linked list]: crate::sync::WaitQueue#implementation-notes
/// [poisoning]: https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html#poisoning
// for some reason, intra-doc links don't work in footnotes?
/// [storage]: ../task/trait.Storage.html
Expand Down Expand Up @@ -115,7 +118,7 @@ pub struct MutexGuard<'a, T: ?Sized> {
#[pin_project]
pub struct Lock<'a, T: ?Sized> {
#[pin]
wait: queue::Wait<'a>,
wait: wait_queue::Wait<'a>,
mutex: &'a Mutex<T>,
}

Expand Down
File renamed without changes.
18 changes: 9 additions & 9 deletions maitake/src/wait/semaphore.rs → maitake/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
spin::{Mutex, MutexGuard},
},
},
wait::{self, WaitResult},
sync::{self, WaitResult},
};
use cordyceps::{
list::{self, List},
Expand Down Expand Up @@ -67,7 +67,7 @@ mod tests;
///
/// ```
/// # use std as alloc;
/// use maitake::{scheduler::Scheduler, wait::Semaphore};
/// use maitake::{scheduler::Scheduler, sync::Semaphore};
/// use alloc::sync::Arc;
///
/// let scheduler = Scheduler::new();
Expand Down Expand Up @@ -105,7 +105,7 @@ mod tests;
///
/// ```
/// # use std as alloc;
/// use maitake::{scheduler::Scheduler, wait::Semaphore};
/// use maitake::{scheduler::Scheduler, sync::Semaphore};
/// use alloc::sync::Arc;
///
/// // How many tasks will we be waiting for the completion of?
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Semaphore {
/// were requested. If an [`Acquire`] future is dropped before it completes,
/// the task will lose its place in the queue.
///
/// [`Closed`]: crate::wait::Closed
/// [`Closed`]: crate::sync::Closed
/// [closed]: Semaphore::close
pub fn acquire(&self, permits: usize) -> Acquire<'_> {
Acquire {
Expand Down Expand Up @@ -378,7 +378,7 @@ impl Semaphore {
/// - `Err(`[`TryAcquireError::InsufficientPermits`]`)` if the semaphore had
/// fewer than `permits` permits available.
///
/// [`Closed`]: crate::wait::Closed
/// [`Closed`]: crate::sync::Closed
/// [closed]: Semaphore::close
pub fn try_acquire(&self, permits: usize) -> Result<Permit<'_>, TryAcquireError> {
trace!(permits, "Semaphore::try_acquire");
Expand Down Expand Up @@ -434,7 +434,7 @@ impl Semaphore {
let mut waiters = loop {
// semaphore has closed
if sem_curr == Self::CLOSED {
return wait::closed();
return sync::closed();
}

// the total number of permits currently available to this waiter
Expand Down Expand Up @@ -522,7 +522,7 @@ impl Semaphore {
queued,
"Semaphore::poll_acquire -> semaphore closed"
);
return wait::closed();
return sync::closed();
}

// add permits to the waiter, returning whether we added enough to wake
Expand Down Expand Up @@ -835,7 +835,7 @@ feature! {
/// completes, the task will lose its place in the queue.
///
/// [`acquire`]: Semaphore::acquire
/// [`Closed`]: crate::wait::Closed
/// [`Closed`]: crate::sync::Closed
/// [closed]: Semaphore::close
pub fn acquire_owned(self: &Arc<Self>, permits: usize) -> AcquireOwned {
AcquireOwned {
Expand Down Expand Up @@ -865,7 +865,7 @@ feature! {
///
///
/// [`try_acquire`]: Semaphore::try_acquire
/// [`Closed`]: crate::wait::Closed
/// [`Closed`]: crate::sync::Closed
/// [closed]: Semaphore::close
pub fn try_acquire_owned(self: &Arc<Self>, permits: usize) -> Result<OwnedPermit, TryAcquireError> {
trace!(permits, "Semaphore::try_acquire_owned");
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn permit_is_send_and_sync() {

#[test]
fn acquire_is_send_and_sync() {
assert_send_sync::<crate::wait::semaphore::Acquire<'_>>();
assert_send_sync::<crate::sync::semaphore::Acquire<'_>>();
}

#[cfg(feature = "alloc")]
Expand Down
2 changes: 0 additions & 2 deletions maitake/src/sync/tests.rs

This file was deleted.

File renamed without changes.
6 changes: 5 additions & 1 deletion maitake/src/wait/map.rs → maitake/src/sync/wait_map.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! A map of [`Waker`]s associated with keys, so that a task can be woken by
//! key.
//!
//! See the documentation for the [`WaitMap`] type for details.
use crate::loom::{
cell::UnsafeCell,
sync::{
Expand Down Expand Up @@ -363,7 +367,7 @@ enum State {
/// *Note*: This *must* correspond to all state bits being set, as it's set
/// via a [`fetch_or`].
///
/// [`Closed`]: crate::wait::Closed
/// [`Closed`]: crate::sync::Closed
/// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or
Closed = 0b11,
}
Expand Down
File renamed without changes.
24 changes: 16 additions & 8 deletions maitake/src/wait/queue.rs → maitake/src/sync/wait_queue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! A queue of waiting tasks that can be woken in first-in, first-out order (or
//! all at once).
//!
//! See the [`WaitQueue`] type's documentation for details.
use crate::{
loom::{
cell::UnsafeCell,
Expand All @@ -6,7 +10,7 @@ use crate::{
spin::Mutex,
},
},
wait::{self, WaitResult},
sync::{self, WaitResult},
};
use cordyceps::{
list::{self, List},
Expand Down Expand Up @@ -50,7 +54,7 @@ mod tests;
///
/// ```
/// use std::sync::Arc;
/// use maitake::{scheduler::Scheduler, wait::WaitQueue};
/// use maitake::{scheduler::Scheduler, sync::WaitQueue};
///
/// const TASKS: usize = 10;
///
Expand Down Expand Up @@ -96,7 +100,7 @@ mod tests;
///
/// ```
/// use std::sync::Arc;
/// use maitake::{scheduler::Scheduler, wait::WaitQueue};
/// use maitake::{scheduler::Scheduler, sync::WaitQueue};
///
/// const TASKS: usize = 10;
///
Expand Down Expand Up @@ -314,7 +318,7 @@ enum State {
/// *Note*: This *must* correspond to all state bits being set, as it's set
/// via a [`fetch_or`].
///
/// [`Closed`]: crate::wait::Closed
/// [`Closed`]: crate::sync::Closed
/// [`fetch_or`]: core::sync::atomic::AtomicUsize::fetch_or
Closed = 0b11,
}
Expand Down Expand Up @@ -343,6 +347,8 @@ impl WaitQueue {
/// Returns a new `WaitQueue` with a single stored wakeup.
///
/// The first call to [`wait`] on this queue will immediately succeed.
///
/// [`wait`]: Self::wait
// TODO(eliza): should this be a public API?
#[must_use]
pub(crate) fn new_woken() -> Self {
Expand All @@ -365,7 +371,7 @@ impl WaitQueue {
/// If the queue is empty, a wakeup is stored in the `WaitQueue`, and the
/// next call to [`wait`] will complete immediately.
///
/// [`wait`]: WaitQueue::wait
/// [`wait`]: Self::wait
#[inline]
pub fn wake(&self) {
// snapshot the queue's current state.
Expand Down Expand Up @@ -451,6 +457,8 @@ impl WaitQueue {
/// This method is generally used when implementing higher-level
/// synchronization primitives or resources: when an event makes a resource
/// permanently unavailable, the queue can be closed.
///
/// [`wait`]: Self::wait
pub fn close(&self) {
let state = self.state.fetch_or(State::Closed.into_usize(), SeqCst);
let state = test_dbg!(QueueState::from_bits(state));
Expand Down Expand Up @@ -494,7 +502,7 @@ impl WaitQueue {
}

match state.get(QueueState::STATE) {
State::Closed => wait::closed(),
State::Closed => sync::closed(),
_ if state.get(QueueState::WAKE_ALLS) > initial_wake_alls => Poll::Ready(Ok(())),
State::Empty | State::Waiting => Poll::Pending,
State::Woken => Poll::Ready(Ok(())),
Expand Down Expand Up @@ -698,7 +706,7 @@ impl Waiter {
Err(actual) => queue_state = actual,
}
}
State::Closed => return wait::closed(),
State::Closed => return sync::closed(),
}
}

Expand Down Expand Up @@ -735,7 +743,7 @@ impl Waiter {
}
Wakeup::Closed => {
this.state.set(WaitStateBits::STATE, WaitState::Woken);
wait::closed()
sync::closed()
}
Wakeup::Empty => unreachable!(),
}
Expand Down
File renamed without changes.
42 changes: 0 additions & 42 deletions maitake/src/wait.rs

This file was deleted.

0 comments on commit 6b71a6b

Please sign in to comment.