Skip to content

Add missing Arc blocking methods #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mutex.rs
Original file line number Diff line number Diff line change
@@ -337,7 +337,7 @@ pin_project_lite::pin_project! {
unsafe impl<T: Send + ?Sized> Send for Lock<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for Lock<'_, T> {}

impl<T: ?Sized> fmt::Debug for LockInner<'_, T> {
impl<T: ?Sized> fmt::Debug for Lock<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Lock { .. }")
}
132 changes: 132 additions & 0 deletions src/rwlock.rs
Original file line number Diff line number Diff line change
@@ -147,6 +147,41 @@ impl<T> RwLock<T> {
pub fn read_arc<'a>(self: &'a Arc<Self>) -> ReadArc<'a, T> {
ReadArc::new(self.raw.read(), self)
}

/// Acquires an owned, reference-counted read lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Note that attempts to acquire a read lock will block if there are also concurrent attempts
/// to acquire a write lock.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`read_arc`][`RwLock::read_arc`] method,
/// this method will block the current thread until the read lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::RwLock;
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.read_arc_blocking();
/// assert_eq!(*reader, 1);
///
/// assert!(lock.try_read().is_some());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn read_arc_blocking(self: &Arc<Self>) -> RwLockReadGuardArc<T> {
self.read_arc().wait()
}
}

impl<T: ?Sized> RwLock<T> {
@@ -347,6 +382,47 @@ impl<T: ?Sized> RwLock<T> {
self.upgradable_read().wait()
}

/// Attempts to acquire an owned, reference-counted read lock
/// with the possiblity to upgrade to a write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Upgradable read lock reserves the right to be upgraded to a write lock, which means there
/// can be at most one upgradable read lock at a time.
///
/// Note that attempts to acquire an upgradable read lock will block if there are concurrent
/// attempts to acquire another upgradable read lock or a write lock.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`upgradable_read_arc`][`RwLock::upgradable_read_arc`]
/// method, this method will block the current thread until the read lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.upgradable_read_arc_blocking();
/// assert_eq!(*reader, 1);
/// assert_eq!(*lock.try_read().unwrap(), 1);
///
/// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader);
/// *writer = 2;
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn upgradable_read_arc_blocking(self: &Arc<Self>) -> RwLockUpgradableReadGuardArc<T> {
self.upgradable_read_arc().wait()
}

/// Attempts to acquire an owned, reference-counted read lock with the possiblity to
/// upgrade to a write lock.
///
@@ -545,6 +621,36 @@ impl<T: ?Sized> RwLock<T> {
WriteArc::new(self.raw.write(), self)
}

/// Acquires an owned, reference-counted write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`write_arc`][RwLock::write_arc] method, this method will
/// block the current thread until the write lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::RwLock;
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let writer = lock.write_arc_blocking();
/// assert!(lock.try_read().is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn write_arc_blocking(self: &Arc<Self>) -> RwLockWriteGuardArc<T> {
self.write_arc().wait()
}

/// Returns a mutable reference to the inner value.
///
/// Since this call borrows the lock mutably, no actual locking takes place. The mutable borrow
@@ -1067,6 +1173,32 @@ impl<T: ?Sized> RwLockUpgradableReadGuardArc<T> {
)
}
}

/// Upgrades into a write lock.
///
/// # Blocking
///
/// This function will block the current thread until it is able to acquire the write lock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.upgradable_read_arc_blocking();
/// assert_eq!(*reader, 1);
///
/// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader);
/// *writer = 2;
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn upgrade_blocking(guard: Self) -> RwLockWriteGuardArc<T> {
RwLockUpgradableReadGuardArc::upgrade(guard).wait()
}
}

/// A guard that releases the write lock when dropped.
57 changes: 47 additions & 10 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
use core::task::Poll;

use alloc::sync::Arc;

@@ -174,10 +173,38 @@ impl Semaphore {
/// # });
/// ```
pub fn acquire_arc(self: &Arc<Self>) -> AcquireArc {
AcquireArc {
AcquireArc::_new(AcquireArcInner {
semaphore: self.clone(),
listener: EventListener::new(),
}
})
}

/// Waits for an owned permit for a concurrent operation.
///
/// Returns a guard that releases the permit when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`acquire_arc`][Semaphore::acquire_arc] method,
/// this method will block the current thread until the permit is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a semaphore can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::Semaphore;
///
/// let s = Arc::new(Semaphore::new(2));
/// let guard = s.acquire_arc_blocking();
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn acquire_arc_blocking(self: &Arc<Self>) -> SemaphoreGuardArc {
self.acquire_arc().wait()
}

/// Adds `n` additional permits to the semaphore.
@@ -223,7 +250,7 @@ pin_project_lite::pin_project! {
}
}

impl fmt::Debug for AcquireInner<'_> {
impl fmt::Debug for Acquire<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Acquire { .. }")
}
@@ -255,9 +282,15 @@ impl<'a> EventListenerFuture for AcquireInner<'a> {
}
}

pin_project_lite::pin_project! {
easy_wrapper! {
/// The future returned by [`Semaphore::acquire_arc`].
pub struct AcquireArc {
pub struct AcquireArc(AcquireArcInner => SemaphoreGuardArc);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project_lite::pin_project! {
struct AcquireArcInner {
// The semaphore being acquired.
semaphore: Arc<Semaphore>,

@@ -273,10 +306,14 @@ impl fmt::Debug for AcquireArc {
}
}

impl Future for AcquireArc {
impl EventListenerFuture for AcquireArcInner {
type Output = SemaphoreGuardArc;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.project();

loop {
@@ -287,7 +324,7 @@ impl Future for AcquireArc {
if !this.listener.is_listening() {
this.listener.as_mut().listen(&this.semaphore.event);
} else {
ready!(this.listener.as_mut().poll(cx));
ready!(strategy.poll(this.listener.as_mut(), cx));
}
}
}
8 changes: 8 additions & 0 deletions tests/mutex.rs
Original file line number Diff line number Diff line change
@@ -32,6 +32,14 @@ fn smoke_blocking() {
drop(m.lock_blocking());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let m = Arc::new(Mutex::new(()));
drop(m.lock_arc_blocking());
drop(m.lock_arc_blocking());
}

#[test]
fn try_lock() {
let m = Mutex::new(());
18 changes: 18 additions & 0 deletions tests/rwlock.rs
Original file line number Diff line number Diff line change
@@ -54,9 +54,27 @@ fn smoke_blocking() {
drop(lock.read_blocking());
drop(lock.write_blocking());
drop((lock.read_blocking(), lock.read_blocking()));
let read = lock.read_blocking();
let upgradabe = lock.upgradable_read_blocking();
drop(read);
drop(RwLockUpgradableReadGuard::upgrade_blocking(upgradabe));
drop(lock.write_blocking());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let lock = Arc::new(RwLock::new(()));
drop(lock.read_arc_blocking());
drop(lock.write_arc_blocking());
drop((lock.read_arc_blocking(), lock.read_arc_blocking()));
let read = lock.read_arc_blocking();
let upgradabe = lock.upgradable_read_arc_blocking();
drop(read);
drop(RwLockUpgradableReadGuardArc::upgrade_blocking(upgradabe));
drop(lock.write_arc_blocking());
}

#[test]
fn try_write() {
future::block_on(async {
11 changes: 11 additions & 0 deletions tests/semaphore.rs
Original file line number Diff line number Diff line change
@@ -125,6 +125,17 @@ fn smoke_blocking() {
assert!(s.try_acquire().is_some());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let s = Arc::new(Semaphore::new(2));
let g1 = s.acquire_arc_blocking();
let _g2 = s.acquire_arc_blocking();
assert!(s.try_acquire().is_none());
drop(g1);
assert!(s.try_acquire().is_some());
}

#[test]
fn add_permits() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);