From 3a277438b9638266dff1ee520461ea08412ec141 Mon Sep 17 00:00:00 2001 From: Jacob Rothstein Date: Thu, 28 Mar 2024 15:24:35 -0700 Subject: [PATCH] feat: Add a loom implementation for event-listener --- .github/workflows/ci.yml | 11 ++ Cargo.toml | 6 +- src/lib.rs | 96 +++++++++++++++++- src/std.rs | 69 ++++++------- tests/loom.rs | 212 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 352 insertions(+), 42 deletions(-) create mode 100644 tests/loom.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9998336..a0113c2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -114,3 +114,14 @@ jobs: - uses: rustsec/audit-check@master with: token: ${{ secrets.GITHUB_TOKEN }} + + loom: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Rust + run: rustup update stable + - name: Loom tests + run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom + + diff --git a/Cargo.toml b/Cargo.toml index 185d6e0..ff118b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,15 +18,19 @@ exclude = ["/.*"] default = ["std"] std = ["concurrent-queue/std", "parking"] portable-atomic = ["portable-atomic-util", "portable_atomic_crate"] +loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"] [dependencies] -concurrent-queue = { version = "2.2.0", default-features = false } +concurrent-queue = { version = "2.4.0", default-features = false } pin-project-lite = "0.2.12" portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] } [target.'cfg(not(target_family = "wasm"))'.dependencies] parking = { version = "2.0.0", optional = true } +[target.'cfg(loom)'.dependencies] +loom = { version = "0.7", optional = true } + [dependencies.portable_atomic_crate] package = "portable-atomic" version = "1.2.0" diff --git a/src/lib.rs b/src/lib.rs index e6123a9..9395e6a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,7 +105,12 @@ use { }; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; -use sync::{Arc, WithMut}; +use sync::Arc; + +#[cfg(not(loom))] +// WitMut is not needed under loom because loom::sync::atomic::AtomicPtr includes an intrinsic +// with_mut that has the same interface as this crate's sync::WithMut::with_mut +use sync::WithMut; use notify::{Internal, NotificationPrivate}; pub use notify::{IntoNotification, Notification}; @@ -216,13 +221,32 @@ impl Event { /// /// let event = Event::::with_tag(); /// ``` - #[cfg(feature = "std")] + #[cfg(all(feature = "std", not(loom)))] #[inline] pub const fn with_tag() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), } } + /// Creates a new `Event` with a tag type. + /// + /// Tagging cannot be implemented efficiently on `no_std`, so this is only available when the + /// `std` feature is enabled. + /// + /// # Examples + /// + /// ``` + /// use event_listener::Event; + /// + /// let event = Event::::with_tag(); + /// ``` + #[cfg(all(feature = "std", loom))] + #[inline] + pub fn with_tag() -> Self { + Self { + inner: AtomicPtr::new(ptr::null_mut()), + } + } /// Tell whether any listeners are currently notified. /// @@ -543,12 +567,30 @@ impl Event<()> { /// let event = Event::new(); /// ``` #[inline] + #[cfg(not(loom))] pub const fn new() -> Self { Self { inner: AtomicPtr::new(ptr::null_mut()), } } + /// Creates a new [`Event`]. + /// + /// # Examples + /// + /// ``` + /// use event_listener::Event; + /// + /// let event = Event::new(); + /// ``` + #[inline] + #[cfg(loom)] + pub fn new() -> Self { + Self { + inner: AtomicPtr::new(ptr::null_mut()), + } + } + /// Notifies a number of active listeners without emitting a `SeqCst` fence. /// /// The number is allowed to be zero or exceed the current number of listeners. @@ -1119,6 +1161,12 @@ impl> + Unpin> InnerListener { match deadline { None => parker.park(), + #[cfg(loom)] + Some(_deadline) => { + panic!("parking does not support timeouts under loom"); + } + + #[cfg(not(loom))] Some(deadline) => { // Make sure we're not timed out already. let now = Instant::now(); @@ -1330,10 +1378,9 @@ const NEVER_INSERTED_PANIC: &str = "\ EventListener was not inserted into the linked list, make sure you're not polling \ EventListener/listener! after it has finished"; +#[cfg(not(loom))] /// Synchronization primitive implementation. mod sync { - pub(super) use core::cell; - #[cfg(not(feature = "portable-atomic"))] pub(super) use alloc::sync::Arc; #[cfg(not(feature = "portable-atomic"))] @@ -1344,7 +1391,7 @@ mod sync { #[cfg(feature = "portable-atomic")] pub(super) use portable_atomic_util::Arc; - #[cfg(feature = "std")] + #[cfg(all(feature = "std", not(loom)))] pub(super) use std::sync::{Mutex, MutexGuard}; pub(super) trait WithMut { @@ -1366,6 +1413,45 @@ mod sync { f(self.get_mut()) } } + + pub(crate) mod cell { + pub(crate) use core::cell::Cell; + + /// This newtype around *const T exists for interoperability with loom::cell::ConstPtr, + /// which works as a guard and performs additional logic to track access scope. + pub(crate) struct ConstPtr(*const T); + impl ConstPtr { + pub(crate) unsafe fn deref(&self) -> &T { + &*self.0 + } + } + + /// This UnsafeCell wrapper exists for interoperability with loom::cell::UnsafeCell, and + /// only contains the interface that is needed for this crate. + #[derive(Debug, Default)] + pub(crate) struct UnsafeCell(core::cell::UnsafeCell); + + impl UnsafeCell { + pub(crate) fn new(data: T) -> UnsafeCell { + UnsafeCell(core::cell::UnsafeCell::new(data)) + } + + pub(crate) fn get(&self) -> ConstPtr { + ConstPtr(self.0.get()) + } + + pub(crate) fn into_inner(self) -> T { + self.0.into_inner() + } + } + } +} + +#[cfg(loom)] +/// Synchronization primitive implementation. +mod sync { + pub(super) use loom::cell; + pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard}; } fn __test_send_and_sync() { diff --git a/src/std.rs b/src/std.rs index 239dc74..3e7f76a 100644 --- a/src/std.rs +++ b/src/std.rs @@ -73,27 +73,27 @@ impl crate::Inner { pub(crate) fn insert(&self, mut listener: Pin<&mut Option>>) { let mut inner = self.lock(); - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { - listener.as_mut().set(Some(Listener { - link: UnsafeCell::new(Link { - state: Cell::new(State::Created), - prev: Cell::new(inner.tail), - next: Cell::new(None), - }), - _pin: PhantomPinned, - })); - let listener = listener.as_pin_mut().unwrap(); - - // Get the inner pointer. - &*listener.link.get() - }; - - // Replace the tail with the new entry. - match mem::replace(&mut inner.tail, Some(entry.into())) { - None => inner.head = Some(entry.into()), - Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, - }; + listener.as_mut().set(Some(Listener { + link: UnsafeCell::new(Link { + state: Cell::new(State::Created), + prev: Cell::new(inner.tail), + next: Cell::new(None), + }), + _pin: PhantomPinned, + })); + let listener = listener.as_pin_mut().unwrap(); + + { + let entry_guard = listener.link.get(); + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; + + // Replace the tail with the new entry. + match mem::replace(&mut inner.tail, Some(entry.into())) { + None => inner.head = Some(entry.into()), + Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) }, + }; + } // If there are no unnotified entries, this is the first one. if inner.next.is_none() { @@ -129,15 +129,12 @@ impl crate::Inner { task: TaskRef<'_>, ) -> RegisterResult { let mut inner = self.lock(); - - // SAFETY: We are locked, so we can access the inner `link`. - let entry = unsafe { - let listener = match listener.as_mut().as_pin_mut() { - Some(listener) => listener, - None => return RegisterResult::NeverInserted, - }; - &*listener.link.get() + let entry_guard = match listener.as_mut().as_pin_mut() { + Some(listener) => listener.link.get(), + None => return RegisterResult::NeverInserted, }; + // SAFETY: We are locked, so we can access the inner `link`. + let entry = unsafe { entry_guard.deref() }; // Take out the state and check it. match entry.state.replace(State::NotifiedTaken) { @@ -175,12 +172,8 @@ impl Inner { mut listener: Pin<&mut Option>>, propagate: bool, ) -> Option> { - let entry = unsafe { - let listener = listener.as_mut().as_pin_mut()?; - - // Get the inner pointer. - &*listener.link.get() - }; + let entry_guard = listener.as_mut().as_pin_mut()?.link.get(); + let entry = unsafe { entry_guard.deref() }; let prev = entry.prev.get(); let next = entry.next.get(); @@ -216,7 +209,11 @@ impl Inner { .into_inner() }; - let mut state = entry.state.into_inner(); + // This State::Created is immediately dropped and exists as a workaround for the absence of + // loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();` + // + // refs: https://github.com/tokio-rs/loom/pull/341 + let mut state = entry.state.replace(State::Created); // Update the notified count. if state.is_notified() { diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..6ef1d05 --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,212 @@ +#![cfg(loom)] +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::Context; +use std::usize; + +use event_listener::{Event, EventListener}; +use waker_fn::waker_fn; + +#[cfg(target_family = "wasm")] +use wasm_bindgen_test::wasm_bindgen_test as test; + +fn is_notified(listener: &mut EventListener) -> bool { + let waker = waker_fn(|| ()); + Pin::new(listener) + .poll(&mut Context::from_waker(&waker)) + .is_ready() +} + +#[test] +fn notify() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + + assert_eq!(event.notify(2), 2); + assert_eq!(event.notify(1), 0); + + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }); +} + +#[test] +fn notify_additional() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify_additional(1), 1); + assert_eq!(event.notify(1), 0); + assert_eq!(event.notify_additional(1), 1); + + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }) +} + +#[test] +fn notify_one() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + + assert_eq!(event.notify(1), 1); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + + assert_eq!(event.notify(1), 1); + assert!(is_notified(&mut l2)); + }); +} + +#[test] +fn notify_all() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + + assert!(!is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + + assert_eq!(event.notify(usize::MAX), 2); + assert!(is_notified(&mut l1)); + assert!(is_notified(&mut l2)); + }); +} + +#[test] +fn drop_notified() { + loom::model(|| { + let event = Event::new(); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l1); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }); +} + +#[test] +fn drop_notified2() { + loom::model(|| { + let event = Event::new(); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + assert_eq!(event.notify(2), 2); + drop(l1); + assert!(is_notified(&mut l2)); + assert!(!is_notified(&mut l3)); + }); +} + +#[test] +fn drop_notified_additional() { + loom::model(|| { + let event = Event::new(); + + let l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + let mut l4 = event.listen(); + + assert_eq!(event.notify_additional(1), 1); + assert_eq!(event.notify(2), 1); + drop(l1); + assert!(is_notified(&mut l2)); + assert!(is_notified(&mut l3)); + assert!(!is_notified(&mut l4)); + }); +} + +#[test] +fn drop_non_notified() { + loom::model(|| { + let event = Event::new(); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let l3 = event.listen(); + + assert_eq!(event.notify(1), 1); + drop(l3); + assert!(is_notified(&mut l1)); + assert!(!is_notified(&mut l2)); + }) +} + +#[test] +fn notify_all_fair() { + loom::model(|| { + let event = Event::new(); + let v = Arc::new(Mutex::new(vec![])); + + let mut l1 = event.listen(); + let mut l2 = event.listen(); + let mut l3 = event.listen(); + + let waker1 = { + let v = v.clone(); + waker_fn(move || v.lock().unwrap().push(1)) + }; + let waker2 = { + let v = v.clone(); + waker_fn(move || v.lock().unwrap().push(2)) + }; + let waker3 = { + let v = v.clone(); + waker_fn(move || v.lock().unwrap().push(3)) + }; + + assert!(Pin::new(&mut l1) + .poll(&mut Context::from_waker(&waker1)) + .is_pending()); + assert!(Pin::new(&mut l2) + .poll(&mut Context::from_waker(&waker2)) + .is_pending()); + assert!(Pin::new(&mut l3) + .poll(&mut Context::from_waker(&waker3)) + .is_pending()); + + assert_eq!(event.notify(usize::MAX), 3); + assert_eq!(&*v.lock().unwrap(), &[1, 2, 3]); + + assert!(Pin::new(&mut l1) + .poll(&mut Context::from_waker(&waker1)) + .is_ready()); + assert!(Pin::new(&mut l2) + .poll(&mut Context::from_waker(&waker2)) + .is_ready()); + assert!(Pin::new(&mut l3) + .poll(&mut Context::from_waker(&waker3)) + .is_ready()); + }) +}