Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add Mutex::lock_owned and Mutex::try_lock_owned #2571

Merged
merged 2 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions futures-util/src/lock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,24 @@
//! library is activated, and it is activated by default.

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "std")]
mod mutex;
#[cfg(any(feature = "sink", feature = "io"))]
#[cfg(not(feature = "bilock"))]
pub(crate) use self::bilock::BiLock;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "bilock")]
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "std")]
pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
pub use self::mutex::{
MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture,
};

#[cfg(not(futures_no_atomic_cas))]
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
mod bilock;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(any(feature = "sink", feature = "io"))]
#[cfg(not(feature = "bilock"))]
pub(crate) use self::bilock::BiLock;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "bilock")]
#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
#[cfg(feature = "std")]
mod mutex;
159 changes: 154 additions & 5 deletions futures-util/src/lock/mutex.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex as StdMutex;
use std::sync::{Arc, Mutex as StdMutex};
use std::{fmt, mem};

use slab::Slab;

use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll, Waker};

/// A futures-aware mutex.
///
/// # Fairness
Expand Down Expand Up @@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> {
}
}

/// Attempt to acquire the lock immediately.
///
/// If the lock is currently held, this will return `None`.
pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
if (old_state & IS_LOCKED) == 0 {
Some(OwnedMutexGuard { mutex: self.clone() })
} else {
None
}
}

/// Acquire the lock asynchronously.
///
/// This method returns a future that will resolve once the lock has been
Expand All @@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> {
MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
}

/// Acquire the lock asynchronously.
///
/// This method returns a future that will resolve once the lock has been
/// successfully acquired.
pub fn lock_owned(self: &Arc<Self>) -> OwnedMutexLockFuture<T> {
Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
OwnedMutexLockFuture { mutex: Some(self.clone()), wait_key: WAIT_KEY_NONE }
}

/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `Mutex` mutably, no actual locking needs to
Expand Down Expand Up @@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> {
}

// Sentinel for when no slot in the `Slab` has been dedicated to this object.
const WAIT_KEY_NONE: usize = usize::max_value();
const WAIT_KEY_NONE: usize = usize::MAX;

/// A future which resolves when the target mutex has been successfully acquired, owned version.
pub struct OwnedMutexLockFuture<T: ?Sized> {
// `None` indicates that the mutex was successfully acquired.
mutex: Option<Arc<Mutex<T>>>,
wait_key: usize,
}

impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OwnedMutexLockFuture")
.field("was_acquired", &self.mutex.is_none())
.field("mutex", &self.mutex)
.field(
"wait_key",
&(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
)
.finish()
}
}

impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
fn is_terminated(&self) -> bool {
self.mutex.is_none()
}
}

impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
type Output = OwnedMutexGuard<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");

if let Some(lock) = mutex.try_lock_owned() {
mutex.remove_waker(this.wait_key, false);
this.mutex = None;
return Poll::Ready(lock);
}

{
let mut waiters = mutex.waiters.lock().unwrap();
if this.wait_key == WAIT_KEY_NONE {
this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
if waiters.len() == 1 {
mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
}
} else {
waiters[this.wait_key].register(cx.waker());
}
}

// Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
// attempting to acquire the lock again.
if let Some(lock) = mutex.try_lock_owned() {
mutex.remove_waker(this.wait_key, false);
this.mutex = None;
return Poll::Ready(lock);
}

Poll::Pending
}
}

impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
fn drop(&mut self) {
if let Some(mutex) = self.mutex.as_ref() {
// This future was dropped before it acquired the mutex.
//
// Remove ourselves from the map, waking up another waiter if we
// had been awoken to acquire the lock.
mutex.remove_waker(self.wait_key, true);
}
}
}

/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
/// When this structure is dropped (falls out of scope), the lock will be
/// unlocked.
pub struct OwnedMutexGuard<T: ?Sized> {
mutex: Arc<Mutex<T>>,
}

impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OwnedMutexGuard")
.field("value", &&**self)
.field("mutex", &self.mutex)
.finish()
}
}

impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
fn drop(&mut self) {
self.mutex.unlock()
}
}

impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.mutex.value.get() }
}
}

impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.value.get() }
}
}

/// A future which resolves when the target mutex has been successfully acquired.
pub struct MutexLockFuture<'a, T: ?Sized> {
Expand Down Expand Up @@ -381,19 +514,35 @@ impl<T: ?Sized, U: ?Sized> DerefMut for MappedMutexGuard<'_, T, U> {
// Mutexes can be moved freely between threads and acquired on any thread so long
// as the inner value can be safely sent between threads.
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}

Sherlock-Holo marked this conversation as resolved.
Show resolved Hide resolved
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}

// It's safe to switch which thread the acquire is being attempted on so long as
// `T` can be accessed on that thread.
unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}

// doesn't have any interesting `&self` methods (only Debug)
unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}

// It's safe to switch which thread the acquire is being attempted on so long as
// `T` can be accessed on that thread.
unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}

// doesn't have any interesting `&self` methods (only Debug)
unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}

// Safe to send since we don't track any thread-specific details-- the inner
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}

unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}

unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}

unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}

unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}

unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}

#[test]
Expand Down