diff --git a/Cargo.toml b/Cargo.toml index b1d5095..720e360 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,8 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -event-listener = "2.5.1" +event-listener = "2" +event-listener-strategy = { git = "https://github.com/smol-rs/event-listener.git" } [dev-dependencies] async-channel = "1.5.0" @@ -24,3 +25,6 @@ futures-lite = "1.12.0" [target.'cfg(any(target_arch = "wasm32", target_arch = "wasm64"))'.dev-dependencies] wasm-bindgen-test = "0.3" + +[patch.crates-io] +event-listener = { git = "https://github.com/smol-rs/event-listener.git" } diff --git a/src/barrier.rs b/src/barrier.rs index e993db2..c7f4643 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -110,7 +110,11 @@ enum WaitState { Initial, /// We are waiting for the listener to complete. - Waiting { evl: EventListener, local_gen: u64 }, + Waiting { + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + evl: Pin>, + local_gen: u64, + }, /// Waiting to re-acquire the lock to check the state again. Reacquiring(u64), diff --git a/src/mutex.rs b/src/mutex.rs index 7087dd8..650d024 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,7 +1,6 @@ use std::borrow::Borrow; use std::cell::UnsafeCell; use std::fmt; -use std::future::Future; use std::marker::PhantomData; use std::mem; use std::ops::{Deref, DerefMut}; @@ -9,7 +8,7 @@ use std::pin::Pin; use std::process; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::Poll; // Note: we cannot use `target_family = "wasm"` here because it requires Rust 1.54. #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] @@ -18,6 +17,7 @@ use std::time::{Duration, Instant}; use std::usize; use event_listener::{Event, EventListener}; +use event_listener_strategy::{easy_wrapper, EventListenerFuture}; /// An async mutex. /// @@ -109,10 +109,10 @@ impl Mutex { /// ``` #[inline] pub fn lock(&self) -> Lock<'_, T> { - Lock { + Lock::_new(LockInner { mutex: self, acquire_slow: None, - } + }) } /// Attempts to acquire the mutex. @@ -184,7 +184,7 @@ impl Mutex { /// ``` #[inline] pub fn lock_arc(self: &Arc) -> LockArc { - LockArc(LockArcInnards::Unpolled(self.clone())) + LockArc::_new(LockArcInnards::Unpolled(self.clone())) } /// Attempts to acquire the mutex and clone a reference to it. @@ -246,8 +246,13 @@ impl Default for Mutex { } } -/// The future returned by [`Mutex::lock`]. -pub struct Lock<'a, T: ?Sized> { +easy_wrapper! { + /// The future returned by [`Mutex::lock`]. + pub struct Lock<'a, T: ?Sized>(LockInner<'a, T> => MutexGuard<'a, T>); + pub(crate) wait(); +} + +struct LockInner<'a, T: ?Sized> { /// Reference to the mutex. mutex: &'a Mutex, @@ -255,45 +260,48 @@ pub struct Lock<'a, T: ?Sized> { acquire_slow: Option, T>>, } -impl<'a, T: ?Sized> Unpin for Lock<'a, T> {} +impl<'a, T: ?Sized> Unpin for LockInner<'a, T> {} -impl fmt::Debug for Lock<'_, T> { +impl fmt::Debug for LockInner<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Lock { .. }") } } -impl<'a, T: ?Sized> Future for Lock<'a, T> { +impl<'a, T: ?Sized> EventListenerFuture for LockInner<'a, T> { type Output = MutexGuard<'a, T>; #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - match this.acquire_slow.as_mut() { + fn poll_with_strategy<'x, S: event_listener_strategy::Strategy<'x>>( + self: Pin<&'x mut Self>, + strategy: &mut S, + context: &mut S::Context, + ) -> Poll { + let LockInner { + mutex, + acquire_slow, + } = self.get_mut(); + + // This may seem weird, but the borrow checker complains otherwise. + if acquire_slow.is_none() { + match mutex.try_lock() { + Some(guard) => return Poll::Ready(guard), None => { - // Try the fast path before trying to register slowly. - match this.mutex.try_lock() { - Some(guard) => return Poll::Ready(guard), - None => { - this.acquire_slow = Some(AcquireSlow::new(this.mutex)); - } - } - } - - Some(acquire_slow) => { - // Continue registering slowly. - let value = ready!(Pin::new(acquire_slow).poll(cx)); - return Poll::Ready(MutexGuard(value)); + *acquire_slow = Some(AcquireSlow::new(mutex)); } } } + + ready!(Pin::new(acquire_slow.as_mut().unwrap()).poll_with_strategy(strategy, context)); + Poll::Ready(MutexGuard(mutex)) } } -/// The future returned by [`Mutex::lock_arc`]. -pub struct LockArc(LockArcInnards); +easy_wrapper! { + /// The future returned by [`Mutex::lock_arc`]. + pub struct LockArc(LockArcInnards => MutexGuardArc); + pub(crate) wait(); +} enum LockArcInnards { /// We have not tried to poll the fast path yet. @@ -301,52 +309,48 @@ enum LockArcInnards { /// We are acquiring the mutex through the slow path. AcquireSlow(AcquireSlow>, T>), - - /// Empty hole to make taking easier. - Empty, } -impl Unpin for LockArc {} +impl Unpin for LockArcInnards {} -impl fmt::Debug for LockArc { +impl fmt::Debug for LockArcInnards { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("LockArc { .. }") } } -impl Future for LockArc { +impl EventListenerFuture for LockArcInnards { type Output = MutexGuardArc; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>( + self: Pin<&'a mut Self>, + strategy: &mut S, + context: &mut S::Context, + ) -> Poll { let this = self.get_mut(); loop { - match mem::replace(&mut this.0, LockArcInnards::Empty) { + match this { LockArcInnards::Unpolled(mutex) => { // Try the fast path before trying to register slowly. match mutex.try_lock_arc() { Some(guard) => return Poll::Ready(guard), None => { - *this = LockArc(LockArcInnards::AcquireSlow(AcquireSlow::new( - mutex.clone(), - ))); + *this = LockArcInnards::AcquireSlow(AcquireSlow::new(mutex.clone())); } } } - LockArcInnards::AcquireSlow(mut acquire_slow) => { + LockArcInnards::AcquireSlow(ref mut acquire_slow) => { // Continue registering slowly. - let value = match Pin::new(&mut acquire_slow).poll(cx) { + let value = match Pin::new(acquire_slow).poll_with_strategy(strategy, context) { Poll::Pending => { - *this = LockArc(LockArcInnards::AcquireSlow(acquire_slow)); return Poll::Pending; } Poll::Ready(value) => value, }; return Poll::Ready(MutexGuardArc(value)); } - - LockArcInnards::Empty => panic!("future polled after completion"), } } } @@ -358,7 +362,9 @@ struct AcquireSlow>, T: ?Sized> { mutex: Option, /// The event listener waiting on the mutex. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, /// The point at which the mutex lock was started. #[cfg(not(any(target_arch = "wasm32", target_os = "wasm64")))] @@ -402,11 +408,15 @@ impl>> AcquireSlow { } } -impl>> Future for AcquireSlow { +impl>> EventListenerFuture for AcquireSlow { type Output = B; #[cold] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll_with_strategy<'a, S: event_listener_strategy::Strategy<'a>>( + mut self: Pin<&'a mut Self>, + strategy: &mut S, + context: &mut S::Context, + ) -> Poll { let this = &mut *self; #[cfg(not(any(target_arch = "wasm32", target_arch = "wasm64")))] let start = *this.start.get_or_insert_with(Instant::now); @@ -443,7 +453,7 @@ impl>> Future for AcquireSlow { } Some(ref mut listener) => { // Wait for a notification. - ready!(Pin::new(listener).poll(cx)); + ready!(strategy.poll(listener.as_mut(), context)); this.listener = None; // Try locking if nobody is being starved. @@ -515,7 +525,7 @@ impl>> Future for AcquireSlow { } Some(ref mut listener) => { // Wait for a notification. - ready!(Pin::new(listener).poll(cx)); + ready!(strategy.poll(listener.as_mut(), context)); this.listener = None; // Try acquiring the lock without waiting for others. diff --git a/src/once_cell.rs b/src/once_cell.rs index 978f57c..9a2a0ab 100644 --- a/src/once_cell.rs +++ b/src/once_cell.rs @@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use event_listener::{Event, EventListener}; +use event_listener_strategy::{Blocking, NonBlocking, Strategy}; /// The current state of the `OnceCell`. #[derive(Copy, Clone, PartialEq, Eq)] @@ -268,7 +269,9 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = self.passive_waiters.listen(); + let listener = EventListener::new(&self.passive_waiters); + pin!(listener); + listener.as_mut().listen(); // Try again. if let Some(value) = self.get() { @@ -320,7 +323,9 @@ impl OnceCell { } // Slow path: wait for the value to be initialized. - let listener = self.passive_waiters.listen(); + let listener = EventListener::new(&self.passive_waiters); + pin!(listener); + listener.as_mut().listen(); // Try again. if let Some(value) = self.get() { @@ -372,7 +377,8 @@ impl OnceCell { } // Slow path: initialize the value. - self.initialize_or_wait(closure, &mut NonBlocking).await?; + self.initialize_or_wait(closure, &mut NonBlocking::default()) + .await?; debug_assert!(self.is_initialized()); // SAFETY: We know that the value is initialized, so it is safe to @@ -425,9 +431,10 @@ impl OnceCell { // Slow path: initialize the value. // The futures provided should never block, so we can use `now_or_never`. - now_or_never( - self.initialize_or_wait(move || std::future::ready(closure()), &mut Blocking), - )?; + now_or_never(self.initialize_or_wait( + move || std::future::ready(closure()), + &mut Blocking::default(), + ))?; debug_assert!(self.is_initialized()); // SAFETY: We know that the value is initialized, so it is safe to @@ -572,10 +579,12 @@ impl OnceCell { async fn initialize_or_wait>, F: FnOnce() -> Fut>( &self, closure: F, - strategy: &mut impl Strategy, + strategy: &mut impl for<'a> Strategy<'a>, ) -> Result<(), E> { // The event listener we're currently waiting on. - let mut event_listener = None; + let event_listener = EventListener::new(&self.active_initializers); + let mut listening = false; + pin!(event_listener); let mut closure = Some(closure); @@ -594,12 +603,12 @@ impl OnceCell { // but we do not have the ability to initialize it. // // We need to wait the initialization to complete. - match event_listener.take() { - None => { - event_listener = Some(self.active_initializers.listen()); - } - - Some(evl) => strategy.poll(evl).await, + if listening { + strategy.wait(event_listener.as_mut()).await; + listening = false; + } else { + event_listener.as_mut().listen(); + listening = true; } } State::Uninitialized => { @@ -771,35 +780,3 @@ fn now_or_never(f: impl Future) -> T { Poll::Pending => unreachable!("future not ready"), } } - -/// The strategy for polling an `event_listener::EventListener`. -trait Strategy { - /// The future that can be polled to wait on the listener. - type Fut: Future; - - /// Poll the event listener. - fn poll(&mut self, evl: EventListener) -> Self::Fut; -} - -/// The strategy for blocking the current thread on an `EventListener`. -struct Blocking; - -impl Strategy for Blocking { - type Fut = std::future::Ready<()>; - - fn poll(&mut self, evl: EventListener) -> Self::Fut { - evl.wait(); - std::future::ready(()) - } -} - -/// The strategy for polling an `EventListener` in an async context. -struct NonBlocking; - -impl Strategy for NonBlocking { - type Fut = EventListener; - - fn poll(&mut self, evl: EventListener) -> Self::Fut { - evl - } -} diff --git a/src/rwlock.rs b/src/rwlock.rs index a1b9154..4b68fdd 100644 --- a/src/rwlock.rs +++ b/src/rwlock.rs @@ -388,7 +388,9 @@ pub struct Read<'a, T: ?Sized> { state: usize, /// The listener for the "no writers" event. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, } impl fmt::Debug for Read<'_, T> { @@ -525,7 +527,9 @@ enum WriteState<'a, T: ?Sized> { guard: Option>, /// The listener for the "no readers" event. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, }, } @@ -786,7 +790,9 @@ pub struct Upgrade<'a, T: ?Sized> { guard: Option>, /// The event listener we are waiting on. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, } impl fmt::Debug for Upgrade<'_, T> { diff --git a/src/semaphore.rs b/src/semaphore.rs index 0de4f53..1a90c27 100644 --- a/src/semaphore.rs +++ b/src/semaphore.rs @@ -160,7 +160,9 @@ pub struct Acquire<'a> { semaphore: &'a Semaphore, /// The listener waiting on the semaphore. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, } impl fmt::Debug for Acquire<'_> { @@ -203,7 +205,9 @@ pub struct AcquireArc { semaphore: Arc, /// The listener waiting on the semaphore. - listener: Option, + /// + /// TODO: At the next breaking release, remove the `Pin>` and make this type `!Unpin`. + listener: Option>>, } impl fmt::Debug for AcquireArc {