From 1dd047894f895f5ce69442ea34a1937d6681f7dc Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Fri, 18 Oct 2019 13:22:24 +0100 Subject: [PATCH 01/10] Mutex performance benchmarks Signed-off-by: Gary Guo --- benches/mutex.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 benches/mutex.rs diff --git a/benches/mutex.rs b/benches/mutex.rs new file mode 100644 index 000000000..810e41a38 --- /dev/null +++ b/benches/mutex.rs @@ -0,0 +1,66 @@ +#![feature(test)] + +extern crate test; + +use async_std::future::Future; +use async_std::sync::{Arc, Mutex}; +use async_std::task; +use futures::task::noop_waker; +use test::Bencher; + +async fn test(task: usize, iter: usize) { + let mutex = Arc::new(Mutex::new(())); + let mut vec = Vec::new(); + for _ in 0..task { + let mutex_clone = mutex.clone(); + let handle = async_std::task::spawn(async move { + for _ in 0..iter { + let _ = mutex_clone.lock().await; + } + }); + vec.push(handle); + } + for i in vec { + i.await + } +} + +#[bench] +fn mutex_contention(b: &mut Bencher) { + b.iter(|| task::block_on(test(10, 1000))); +} + +#[bench] +fn mutex_no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(test(1, 10000))); +} + +#[bench] +fn mutex_unused(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn mutex_mimick_contention(b: &mut Bencher) { + let noop_waker = noop_waker(); + let mut context = task::Context::from_waker(&noop_waker); + + b.iter(|| { + let mutex = Mutex::new(()); + let mut vec = Vec::with_capacity(10); + + // Mimick 10 tasks concurrently trying to acquire the lock. + for _ in 0..10 { + let mut lock_future = Box::pin(mutex.lock()); + let poll_result = lock_future.as_mut().poll(&mut context); + vec.push((lock_future, poll_result)); + } + + // Go through all 10 tasks and release the lock. + for (mut future, mut poll) in vec { + while let task::Poll::Pending = poll { + poll = future.as_mut().poll(&mut context); + } + } + }); +} From 22f508e65337466cf5c0532351298ef39d320271 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Fri, 18 Oct 2019 21:48:30 +0100 Subject: [PATCH 02/10] De-bloat `Mutex`es by adding `RawMutex` All `Mutex`es now internally use `RawMutex` (which is similar to a `Mutex<()>`, only providing locking semantics but not data), therefore instantiating `Mutex`es on different types do not duplicate code. This patch does not otherwise change the algorithm used by `Mutex`. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 230 ++++++++++++++++++++++++++-------------------- 1 file changed, 130 insertions(+), 100 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index cd7a3577f..5df113e12 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -15,6 +15,130 @@ const LOCK: usize = 1; /// Set if there are tasks blocked on the mutex. const BLOCKED: usize = 1 << 1; +struct RawMutex { + state: AtomicUsize, + blocked: std::sync::Mutex>>, +} + +unsafe impl Send for RawMutex {} +unsafe impl Sync for RawMutex {} + +impl RawMutex { + /// Creates a new raw mutex. + pub fn new() -> RawMutex { + RawMutex { + state: AtomicUsize::new(0), + blocked: std::sync::Mutex::new(Slab::new()), + } + } + + /// Acquires the lock. + /// + /// We don't use `async` signature here for performance concern. + pub fn lock(&self) -> RawLockFuture<'_> { + RawLockFuture { + mutex: self, + opt_key: None, + acquired: false, + } + } + + /// Attempts to acquire the lock. + pub fn try_lock(&self) -> bool { + self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 + } + + /// Unlock this mutex. + pub fn unlock(&self) { + let state = self.state.fetch_and(!LOCK, Ordering::AcqRel); + + // If there are any blocked tasks, wake one of them up. + if state & BLOCKED != 0 { + let mut blocked = self.blocked.lock().unwrap(); + + if let Some((_, opt_waker)) = blocked.iter_mut().next() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } + } +} + +struct RawLockFuture<'a> { + mutex: &'a RawMutex, + opt_key: Option, + acquired: bool, +} + +impl<'a> Future for RawLockFuture<'a> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.mutex.try_lock() { + self.acquired = true; + Poll::Ready(()) + } else { + let mut blocked = self.mutex.blocked.lock().unwrap(); + + // Register the current task. + match self.opt_key { + None => { + // Insert a new entry into the list of blocked tasks. + let w = cx.waker().clone(); + let key = blocked.insert(Some(w)); + self.opt_key = Some(key); + + if blocked.len() == 1 { + self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); + } + } + Some(key) => { + // There is already an entry in the list of blocked tasks. Just + // reset the waker if it was removed. + if blocked[key].is_none() { + let w = cx.waker().clone(); + blocked[key] = Some(w); + } + } + } + + // Try locking again because it's possible the mutex got unlocked just + // before the current task was registered as a blocked task. + if self.mutex.try_lock() { + self.acquired = true; + Poll::Ready(()) + } else { + Poll::Pending + } + } + } +} + +impl Drop for RawLockFuture<'_> { + fn drop(&mut self) { + if let Some(key) = self.opt_key { + let mut blocked = self.mutex.blocked.lock().unwrap(); + let opt_waker = blocked.remove(key); + + if opt_waker.is_none() && !self.acquired { + // We were awoken but didn't acquire the lock. Wake up another task. + if let Some((_, opt_waker)) = blocked.iter_mut().next() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } + + if blocked.is_empty() { + self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); + } + } + } +} + /// A mutual exclusion primitive for protecting shared data. /// /// This type is an async version of [`std::sync::Mutex`]. @@ -49,8 +173,7 @@ const BLOCKED: usize = 1 << 1; /// # }) /// ``` pub struct Mutex { - state: AtomicUsize, - blocked: std::sync::Mutex>>, + mutex: RawMutex, value: UnsafeCell, } @@ -69,8 +192,7 @@ impl Mutex { /// ``` pub fn new(t: T) -> Mutex { Mutex { - state: AtomicUsize::new(0), - blocked: std::sync::Mutex::new(Slab::new()), + mutex: RawMutex::new(), value: UnsafeCell::new(t), } } @@ -102,88 +224,8 @@ impl Mutex { /// # }) /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { - pub struct LockFuture<'a, T> { - mutex: &'a Mutex, - opt_key: Option, - acquired: bool, - } - - impl<'a, T> Future for LockFuture<'a, T> { - type Output = MutexGuard<'a, T>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } - None => { - let mut blocked = self.mutex.blocked.lock().unwrap(); - - // Register the current task. - match self.opt_key { - None => { - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.insert(Some(w)); - self.opt_key = Some(key); - - if blocked.len() == 1 { - self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); - } - } - Some(key) => { - // There is already an entry in the list of blocked tasks. Just - // reset the waker if it was removed. - if blocked[key].is_none() { - let w = cx.waker().clone(); - blocked[key] = Some(w); - } - } - } - - // Try locking again because it's possible the mutex got unlocked just - // before the current task was registered as a blocked task. - match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } - None => Poll::Pending, - } - } - } - } - } - - impl Drop for LockFuture<'_, T> { - fn drop(&mut self) { - if let Some(key) = self.opt_key { - let mut blocked = self.mutex.blocked.lock().unwrap(); - let opt_waker = blocked.remove(key); - - if opt_waker.is_none() && !self.acquired { - // We were awoken but didn't acquire the lock. Wake up another task. - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } - - if blocked.is_empty() { - self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); - } - } - } - } - - LockFuture { - mutex: self, - opt_key: None, - acquired: false, - } - .await + self.mutex.lock().await; + MutexGuard(self) } /// Attempts to acquire the lock. @@ -220,7 +262,7 @@ impl Mutex { /// # }) /// ``` pub fn try_lock(&self) -> Option> { - if self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 { + if self.mutex.try_lock() { Some(MutexGuard(self)) } else { None @@ -303,19 +345,7 @@ unsafe impl Sync for MutexGuard<'_, T> {} impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - let state = self.0.state.fetch_and(!LOCK, Ordering::AcqRel); - - // If there are any blocked tasks, wake one of them up. - if state & BLOCKED != 0 { - let mut blocked = self.0.blocked.lock().unwrap(); - - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } + self.0.mutex.unlock(); } } From 39ef033b2b53272a11f7ab3b2439bfab9e36615e Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Fri, 18 Oct 2019 21:56:14 +0100 Subject: [PATCH 03/10] Regain some lost performance due to de-bloating. #[inline] are added to common and trivial functions, and slow paths are separated out from inlined hotpath. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 5df113e12..822aefef0 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -25,6 +25,7 @@ unsafe impl Sync for RawMutex {} impl RawMutex { /// Creates a new raw mutex. + #[inline] pub fn new() -> RawMutex { RawMutex { state: AtomicUsize::new(0), @@ -35,6 +36,7 @@ impl RawMutex { /// Acquires the lock. /// /// We don't use `async` signature here for performance concern. + #[inline] pub fn lock(&self) -> RawLockFuture<'_> { RawLockFuture { mutex: self, @@ -44,24 +46,31 @@ impl RawMutex { } /// Attempts to acquire the lock. + #[inline] pub fn try_lock(&self) -> bool { self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 } + #[cold] + fn unlock_slow(&self) { + let mut blocked = self.blocked.lock().unwrap(); + + if let Some((_, opt_waker)) = blocked.iter_mut().next() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } + /// Unlock this mutex. + #[inline] pub fn unlock(&self) { let state = self.state.fetch_and(!LOCK, Ordering::AcqRel); // If there are any blocked tasks, wake one of them up. if state & BLOCKED != 0 { - let mut blocked = self.blocked.lock().unwrap(); - - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } + self.unlock_slow(); } } } @@ -190,6 +199,7 @@ impl Mutex { /// /// let mutex = Mutex::new(0); /// ``` + #[inline] pub fn new(t: T) -> Mutex { Mutex { mutex: RawMutex::new(), @@ -261,6 +271,7 @@ impl Mutex { /// # /// # }) /// ``` + #[inline] pub fn try_lock(&self) -> Option> { if self.mutex.try_lock() { Some(MutexGuard(self)) @@ -279,6 +290,7 @@ impl Mutex { /// let mutex = Mutex::new(10); /// assert_eq!(mutex.into_inner(), 10); /// ``` + #[inline] pub fn into_inner(self) -> T { self.value.into_inner() } @@ -301,6 +313,7 @@ impl Mutex { /// # /// # }) /// ``` + #[inline] pub fn get_mut(&mut self) -> &mut T { unsafe { &mut *self.value.get() } } @@ -326,12 +339,14 @@ impl fmt::Debug for Mutex { } impl From for Mutex { + #[inline] fn from(val: T) -> Mutex { Mutex::new(val) } } impl Default for Mutex { + #[inline] fn default() -> Mutex { Mutex::new(Default::default()) } @@ -344,6 +359,7 @@ unsafe impl Send for MutexGuard<'_, T> {} unsafe impl Sync for MutexGuard<'_, T> {} impl Drop for MutexGuard<'_, T> { + #[inline] fn drop(&mut self) { self.0.mutex.unlock(); } @@ -364,12 +380,14 @@ impl fmt::Display for MutexGuard<'_, T> { impl Deref for MutexGuard<'_, T> { type Target = T; + #[inline] fn deref(&self) -> &T { unsafe { &*self.0.value.get() } } } impl DerefMut for MutexGuard<'_, T> { + #[inline] fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.0.value.get() } } From 5427c41cd94d6572521e944898ee0a3f564493e7 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Fri, 18 Oct 2019 21:58:35 +0100 Subject: [PATCH 04/10] Replace Slab-backed waker list with linked list Signed-off-by: Gary Guo --- src/sync/mod.rs | 1 + src/sync/mutex.rs | 43 ++++++-------- src/sync/waker_list.rs | 130 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 26 deletions(-) create mode 100644 src/sync/waker_list.rs diff --git a/src/sync/mod.rs b/src/sync/mod.rs index be74d8f7b..983063145 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -37,6 +37,7 @@ pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; mod mutex; mod rwlock; +mod waker_list; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 822aefef0..5449d24a9 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -4,10 +4,11 @@ use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use slab::Slab; - use crate::future::Future; -use crate::task::{Context, Poll, Waker}; +use crate::task::{Context, Poll}; + +use super::waker_list::WakerList; +use std::num::NonZeroUsize; /// Set if the mutex is locked. const LOCK: usize = 1; @@ -17,7 +18,7 @@ const BLOCKED: usize = 1 << 1; struct RawMutex { state: AtomicUsize, - blocked: std::sync::Mutex>>, + blocked: std::sync::Mutex, } unsafe impl Send for RawMutex {} @@ -29,7 +30,7 @@ impl RawMutex { pub fn new() -> RawMutex { RawMutex { state: AtomicUsize::new(0), - blocked: std::sync::Mutex::new(Slab::new()), + blocked: std::sync::Mutex::new(WakerList::new()), } } @@ -54,13 +55,7 @@ impl RawMutex { #[cold] fn unlock_slow(&self) { let mut blocked = self.blocked.lock().unwrap(); - - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } + blocked.wake_one_weak(); } /// Unlock this mutex. @@ -77,7 +72,7 @@ impl RawMutex { struct RawLockFuture<'a> { mutex: &'a RawMutex, - opt_key: Option, + opt_key: Option, acquired: bool, } @@ -94,21 +89,22 @@ impl<'a> Future for RawLockFuture<'a> { // Register the current task. match self.opt_key { None => { + if blocked.is_empty() { + self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); + } + // Insert a new entry into the list of blocked tasks. let w = cx.waker().clone(); let key = blocked.insert(Some(w)); self.opt_key = Some(key); - - if blocked.len() == 1 { - self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); - } } Some(key) => { // There is already an entry in the list of blocked tasks. Just // reset the waker if it was removed. - if blocked[key].is_none() { + let opt_waker = unsafe { blocked.get(key) }; + if opt_waker.is_none() { let w = cx.waker().clone(); - blocked[key] = Some(w); + *opt_waker = Some(w); } } } @@ -129,16 +125,11 @@ impl Drop for RawLockFuture<'_> { fn drop(&mut self) { if let Some(key) = self.opt_key { let mut blocked = self.mutex.blocked.lock().unwrap(); - let opt_waker = blocked.remove(key); + let opt_waker = unsafe { blocked.remove(key) }; if opt_waker.is_none() && !self.acquired { // We were awoken but didn't acquire the lock. Wake up another task. - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } + blocked.wake_one_weak(); } if blocked.is_empty() { diff --git a/src/sync/waker_list.rs b/src/sync/waker_list.rs new file mode 100644 index 000000000..844d567ff --- /dev/null +++ b/src/sync/waker_list.rs @@ -0,0 +1,130 @@ +use crate::task::Waker; + +use std::num::NonZeroUsize; + +struct WakerNode { + /// Previous `WakerNode` in the queue. If this node is the first node, it shall point to the last node. + prev_in_queue: *mut WakerNode, + /// Next `WakerNode` in the queue. If this node is the last node, it shall be null. + next_in_queue: *mut WakerNode, + waker: Option, +} + +pub struct WakerList { + head: *mut WakerNode, +} + +unsafe impl Send for WakerList {} +unsafe impl Sync for WakerList {} + +impl WakerList { + /// Create a new empty `WakerList` + pub fn new() -> Self { + Self { + head: std::ptr::null_mut(), + } + } + + /// Insert a waker to the back of the list, and return its key. + pub fn insert(&mut self, waker: Option) -> NonZeroUsize { + let node = Box::into_raw(Box::new(WakerNode { + waker, + next_in_queue: std::ptr::null_mut(), + prev_in_queue: std::ptr::null_mut(), + })); + + if self.head.is_null() { + unsafe { + (*node).prev_in_queue = node; + } + self.head = node; + } else { + unsafe { + let prev = std::mem::replace(&mut (*self.head).prev_in_queue, node); + (*prev).next_in_queue = node; + (*node).prev_in_queue = prev; + } + } + + unsafe { NonZeroUsize::new_unchecked(node as usize) } + } + + /// Remove a waker by its key. + /// + /// # Safety + /// This function is unsafe because there is no guarantee that key is the previously returned + /// key, and that the key is only removed once. + pub unsafe fn remove(&mut self, key: NonZeroUsize) -> Option { + let node = key.get() as *mut WakerNode; + let prev = (*node).prev_in_queue; + let next = (*node).next_in_queue; + + // Special treatment on removing first node + if self.head == node { + self.head = next; + } else { + std::mem::replace(&mut (*prev).next_in_queue, next); + } + + // Special treatment on removing last node + if next.is_null() { + if !self.head.is_null() { + std::mem::replace(&mut (*self.head).prev_in_queue, prev); + } + } else { + std::mem::replace(&mut (*next).prev_in_queue, prev); + } + + Box::from_raw(node).waker + } + + /// Get a waker by its key. + /// + /// # Safety + /// This function is unsafe because there is no guarantee that key is the previously returned + /// key, and that the key is not removed. + pub unsafe fn get(&mut self, key: NonZeroUsize) -> &mut Option { + &mut (*(key.get() as *mut WakerNode)).waker + } + + /// Check if this list is empty. + pub fn is_empty(&self) -> bool { + self.head.is_null() + } + + /// Get an iterator over all wakers. + pub fn iter_mut(&mut self) -> Iter<'_> { + Iter { + ptr: self.head, + _marker: std::marker::PhantomData, + } + } + + /// Wake the first waker in the list, and convert it to `None`. This function is named `weak` as + /// nothing is performed when the first waker is waken already. + pub fn wake_one_weak(&mut self) { + if let Some(opt_waker) = self.iter_mut().next() { + if let Some(w) = opt_waker.take() { + w.wake(); + } + } + } +} + +pub struct Iter<'a> { + ptr: *mut WakerNode, + _marker: std::marker::PhantomData<&'a ()>, +} + +impl<'a> Iterator for Iter<'a> { + type Item = &'a mut Option; + + fn next(&mut self) -> Option { + if self.ptr.is_null() { + return None; + } + let next = unsafe { (*self.ptr).next_in_queue }; + let ptr = std::mem::replace(&mut self.ptr, next); + Some(unsafe { &mut (*ptr).waker }) + } +} From 74186ffe69fafc05d9c2b40ce7198e02429f1e42 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Sun, 20 Oct 2019 00:26:35 +0100 Subject: [PATCH 05/10] Implement WakerListLock WakerListLock is an optimised version of Spinlock which is more efficient in performance and space. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 12 ++++---- src/sync/waker_list.rs | 68 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 5449d24a9..55f32d104 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -7,7 +7,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use crate::future::Future; use crate::task::{Context, Poll}; -use super::waker_list::WakerList; +use super::waker_list::{WakerList, WakerListLock}; use std::num::NonZeroUsize; /// Set if the mutex is locked. @@ -18,7 +18,7 @@ const BLOCKED: usize = 1 << 1; struct RawMutex { state: AtomicUsize, - blocked: std::sync::Mutex, + blocked: WakerListLock, } unsafe impl Send for RawMutex {} @@ -30,7 +30,7 @@ impl RawMutex { pub fn new() -> RawMutex { RawMutex { state: AtomicUsize::new(0), - blocked: std::sync::Mutex::new(WakerList::new()), + blocked: WakerListLock::new(WakerList::new()), } } @@ -54,7 +54,7 @@ impl RawMutex { #[cold] fn unlock_slow(&self) { - let mut blocked = self.blocked.lock().unwrap(); + let mut blocked = self.blocked.lock(); blocked.wake_one_weak(); } @@ -84,7 +84,7 @@ impl<'a> Future for RawLockFuture<'a> { self.acquired = true; Poll::Ready(()) } else { - let mut blocked = self.mutex.blocked.lock().unwrap(); + let mut blocked = self.mutex.blocked.lock(); // Register the current task. match self.opt_key { @@ -124,7 +124,7 @@ impl<'a> Future for RawLockFuture<'a> { impl Drop for RawLockFuture<'_> { fn drop(&mut self) { if let Some(key) = self.opt_key { - let mut blocked = self.mutex.blocked.lock().unwrap(); + let mut blocked = self.mutex.blocked.lock(); let opt_waker = unsafe { blocked.remove(key) }; if opt_waker.is_none() && !self.acquired { diff --git a/src/sync/waker_list.rs b/src/sync/waker_list.rs index 844d567ff..8c31003c1 100644 --- a/src/sync/waker_list.rs +++ b/src/sync/waker_list.rs @@ -1,6 +1,9 @@ use crate::task::Waker; +use crossbeam_utils::Backoff; use std::num::NonZeroUsize; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicUsize, Ordering}; struct WakerNode { /// Previous `WakerNode` in the queue. If this node is the first node, it shall point to the last node. @@ -19,6 +22,7 @@ unsafe impl Sync for WakerList {} impl WakerList { /// Create a new empty `WakerList` + #[inline] pub fn new() -> Self { Self { head: std::ptr::null_mut(), @@ -128,3 +132,67 @@ impl<'a> Iterator for Iter<'a> { Some(unsafe { &mut (*ptr).waker }) } } + +/// This is identical to a Spinlock, but is efficient in space and occupies the same +/// amount of memory as a pointer. Performance-wise it can be better than a spinlock because once +/// the lock is acquired, modification to the WakerList is on the stack, and thus is not on the +/// same cache line as the atomic variable itself, relieving some contention. +pub struct WakerListLock { + /// If this value is 1, it represents that it is locked. + /// All other values represent a valid `*mut WakerNode`. + value: AtomicUsize, +} + +impl WakerListLock { + /// Returns a new pointer lock initialized with `value`. + #[inline] + pub fn new(value: WakerList) -> Self { + Self { + value: AtomicUsize::new(value.head as usize), + } + } + + /// Locks the `WakerListLock`. + pub fn lock(&self) -> WakerListLockGuard<'_> { + let backoff = Backoff::new(); + loop { + let value = self.value.swap(1, Ordering::Acquire); + if value != 1 { + return WakerListLockGuard { + parent: self, + list: WakerList { + head: value as *mut WakerNode, + }, + }; + } + backoff.snooze(); + } + } +} + +pub struct WakerListLockGuard<'a> { + parent: &'a WakerListLock, + list: WakerList, +} + +impl<'a> Drop for WakerListLockGuard<'a> { + fn drop(&mut self) { + self.parent + .value + .store(self.list.head as usize, Ordering::Release); + } +} + +impl<'a> Deref for WakerListLockGuard<'a> { + type Target = WakerList; + + fn deref(&self) -> &WakerList { + &self.list + } +} + +impl<'a> DerefMut for WakerListLockGuard<'a> { + fn deref_mut(&mut self) -> &mut WakerList { + &mut self.list + } +} From 44052b24ca828cf607a881967d978adbad6ce13f Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Sun, 20 Oct 2019 00:48:33 +0100 Subject: [PATCH 06/10] Remove the `acquired` bool from RawLockFuture. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 50 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 55f32d104..f90927b8b 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -42,7 +42,6 @@ impl RawMutex { RawLockFuture { mutex: self, opt_key: None, - acquired: false, } } @@ -72,8 +71,34 @@ impl RawMutex { struct RawLockFuture<'a> { mutex: &'a RawMutex, + /// None indicates that the Future isn't yet polled, or has already returned `Ready`. + /// RawLockFuture does not distinguish between these two states. opt_key: Option, - acquired: bool, +} + +impl<'a> RawLockFuture<'a> { + /// Remove waker registration. This should be called upon successful acqusition of the lock. + fn deregister_waker(&mut self, acquired: bool) { + if let Some(key) = self.opt_key.take() { + let mut blocked = self.mutex.blocked.lock(); + let opt_waker = unsafe { blocked.remove(key) }; + + if opt_waker.is_none() && !acquired { + // We were awoken but didn't acquire the lock. Wake up another task. + blocked.wake_one_weak(); + } + + if blocked.is_empty() { + self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); + } + } + } + + /// Cold path of drop. Only to be hit when locking is cancelled. + #[cold] + fn drop_slow(&mut self) { + self.deregister_waker(false); + } } impl<'a> Future for RawLockFuture<'a> { @@ -81,7 +106,7 @@ impl<'a> Future for RawLockFuture<'a> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.mutex.try_lock() { - self.acquired = true; + self.deregister_waker(true); Poll::Ready(()) } else { let mut blocked = self.mutex.blocked.lock(); @@ -112,7 +137,8 @@ impl<'a> Future for RawLockFuture<'a> { // Try locking again because it's possible the mutex got unlocked just // before the current task was registered as a blocked task. if self.mutex.try_lock() { - self.acquired = true; + std::mem::drop(blocked); + self.deregister_waker(true); Poll::Ready(()) } else { Poll::Pending @@ -122,19 +148,11 @@ impl<'a> Future for RawLockFuture<'a> { } impl Drop for RawLockFuture<'_> { + #[inline] fn drop(&mut self) { - if let Some(key) = self.opt_key { - let mut blocked = self.mutex.blocked.lock(); - let opt_waker = unsafe { blocked.remove(key) }; - - if opt_waker.is_none() && !self.acquired { - // We were awoken but didn't acquire the lock. Wake up another task. - blocked.wake_one_weak(); - } - - if blocked.is_empty() { - self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); - } + if self.opt_key.is_some() { + // This cold path is only going to be reached when we drop the future when locking is cancelled. + self.drop_slow(); } } } From 7a53719fd31829e63e2814843214f1076207ce11 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Sun, 20 Oct 2019 00:49:53 +0100 Subject: [PATCH 07/10] Unlocking the mutex only has to be Release, not AcqRel. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index f90927b8b..c11c5037a 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -60,7 +60,7 @@ impl RawMutex { /// Unlock this mutex. #[inline] pub fn unlock(&self) { - let state = self.state.fetch_and(!LOCK, Ordering::AcqRel); + let state = self.state.fetch_and(!LOCK, Ordering::Release); // If there are any blocked tasks, wake one of them up. if state & BLOCKED != 0 { From 7855b900c9a9f6a889c32ff91ff6621bc095bd62 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Sun, 20 Oct 2019 00:59:29 +0100 Subject: [PATCH 08/10] Retry locking the mutex before touching waker list. Moving the try_lock code to before touching the waker list is sound, because the waker list can only ever be accessed with `blocked` hold, so as long as we retry lock it while having `blocked` locked, we are okay. This code also set both LOCK and BLOCKED in the same atomic op. This has some performance improvements by touching the atomic variable 1 less time when inserting the entry. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index c11c5037a..6b1b4a1cc 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -111,13 +111,18 @@ impl<'a> Future for RawLockFuture<'a> { } else { let mut blocked = self.mutex.blocked.lock(); + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. + let state = self.mutex.state.fetch_or(LOCK | BLOCKED, Ordering::Relaxed); + if state & LOCK == 0 { + std::mem::drop(blocked); + self.deregister_waker(true); + return Poll::Ready(()) + } + // Register the current task. match self.opt_key { None => { - if blocked.is_empty() { - self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); - } - // Insert a new entry into the list of blocked tasks. let w = cx.waker().clone(); let key = blocked.insert(Some(w)); @@ -134,15 +139,7 @@ impl<'a> Future for RawLockFuture<'a> { } } - // Try locking again because it's possible the mutex got unlocked just - // before the current task was registered as a blocked task. - if self.mutex.try_lock() { - std::mem::drop(blocked); - self.deregister_waker(true); - Poll::Ready(()) - } else { - Poll::Pending - } + Poll::Pending } } } From 851c225432bfaef6ae9b262126393e5aed6f98f6 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Sun, 20 Oct 2019 01:05:54 +0100 Subject: [PATCH 09/10] Transpose the two checks in RawLockFuture::poll We originally check try_lock before test if opt_key is None. This commit changes its order. Doing so removes the need to deregister_waker when opt_key is None, therefore makes the hot path (un-contended case) faster. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 55 ++++++++++++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 6b1b4a1cc..7222a48c1 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -105,30 +105,45 @@ impl<'a> Future for RawLockFuture<'a> { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.mutex.try_lock() { - self.deregister_waker(true); - Poll::Ready(()) - } else { - let mut blocked = self.mutex.blocked.lock(); - - // Try locking again because it's possible the mutex got unlocked before - // we acquire the lock of `blocked`. - let state = self.mutex.state.fetch_or(LOCK | BLOCKED, Ordering::Relaxed); - if state & LOCK == 0 { - std::mem::drop(blocked); - self.deregister_waker(true); - return Poll::Ready(()) - } + match self.opt_key { + None => { + if self.mutex.try_lock() { + Poll::Ready(()) + } else { + let mut blocked = self.mutex.blocked.lock(); + + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. + let state = self.mutex.state.fetch_or(LOCK | BLOCKED, Ordering::Relaxed); + if state & LOCK == 0 { + return Poll::Ready(()); + } - // Register the current task. - match self.opt_key { - None => { + // Register the current task. // Insert a new entry into the list of blocked tasks. let w = cx.waker().clone(); let key = blocked.insert(Some(w)); self.opt_key = Some(key); + + Poll::Pending } - Some(key) => { + } + Some(key) => { + if self.mutex.try_lock() { + self.deregister_waker(true); + Poll::Ready(()) + } else { + let mut blocked = self.mutex.blocked.lock(); + + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. On this path we know we have BLOCKED + // set, so don't bother to set it again. + if self.mutex.try_lock() { + std::mem::drop(blocked); + self.deregister_waker(true); + return Poll::Ready(()); + } + // There is already an entry in the list of blocked tasks. Just // reset the waker if it was removed. let opt_waker = unsafe { blocked.get(key) }; @@ -136,10 +151,10 @@ impl<'a> Future for RawLockFuture<'a> { let w = cx.waker().clone(); *opt_waker = Some(w); } + + Poll::Pending } } - - Poll::Pending } } } From 43f598da6248c17ba80d923d40a175c21a6446f6 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Sun, 20 Oct 2019 01:25:13 +0100 Subject: [PATCH 10/10] Move RawLockFuture::poll cold path to #[cold] functions This makes RawLockFuture::poll itself small enough which is suitable for inlining. Signed-off-by: Gary Guo --- src/sync/mutex.rs | 98 ++++++++++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 7222a48c1..cc370d696 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -78,6 +78,7 @@ struct RawLockFuture<'a> { impl<'a> RawLockFuture<'a> { /// Remove waker registration. This should be called upon successful acqusition of the lock. + #[cold] fn deregister_waker(&mut self, acquired: bool) { if let Some(key) = self.opt_key.take() { let mut blocked = self.mutex.blocked.lock(); @@ -94,6 +95,57 @@ impl<'a> RawLockFuture<'a> { } } + /// The cold path where the first poll of a mutex will cause the mutex to block. + #[cold] + fn poll_would_block(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let mut blocked = self.mutex.blocked.lock(); + + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. + let state = self.mutex.state.fetch_or(LOCK | BLOCKED, Ordering::Relaxed); + if state & LOCK == 0 { + return Poll::Ready(()); + } + + // Register the current task. + // Insert a new entry into the list of blocked tasks. + let w = cx.waker().clone(); + let key = blocked.insert(Some(w)); + self.opt_key = Some(key); + + Poll::Pending + } + + /// The cold path where we are polling an already-blocked mutex + #[cold] + fn poll_blocked(&mut self, cx: &mut Context<'_>) -> Poll<()> { + if self.mutex.try_lock() { + self.deregister_waker(true); + Poll::Ready(()) + } else { + let mut blocked = self.mutex.blocked.lock(); + + // Try locking again because it's possible the mutex got unlocked before + // we acquire the lock of `blocked`. On this path we know we have BLOCKED + // set, so don't bother to set it again. + if self.mutex.try_lock() { + std::mem::drop(blocked); + self.deregister_waker(true); + return Poll::Ready(()); + } + + // There is already an entry in the list of blocked tasks. Just + // reset the waker if it was removed. + let opt_waker = unsafe { blocked.get(self.opt_key.unwrap()) }; + if opt_waker.is_none() { + let w = cx.waker().clone(); + *opt_waker = Some(w); + } + + Poll::Pending + } + } + /// Cold path of drop. Only to be hit when locking is cancelled. #[cold] fn drop_slow(&mut self) { @@ -104,57 +156,17 @@ impl<'a> RawLockFuture<'a> { impl<'a> Future for RawLockFuture<'a> { type Output = (); + #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.opt_key { None => { if self.mutex.try_lock() { Poll::Ready(()) } else { - let mut blocked = self.mutex.blocked.lock(); - - // Try locking again because it's possible the mutex got unlocked before - // we acquire the lock of `blocked`. - let state = self.mutex.state.fetch_or(LOCK | BLOCKED, Ordering::Relaxed); - if state & LOCK == 0 { - return Poll::Ready(()); - } - - // Register the current task. - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.insert(Some(w)); - self.opt_key = Some(key); - - Poll::Pending - } - } - Some(key) => { - if self.mutex.try_lock() { - self.deregister_waker(true); - Poll::Ready(()) - } else { - let mut blocked = self.mutex.blocked.lock(); - - // Try locking again because it's possible the mutex got unlocked before - // we acquire the lock of `blocked`. On this path we know we have BLOCKED - // set, so don't bother to set it again. - if self.mutex.try_lock() { - std::mem::drop(blocked); - self.deregister_waker(true); - return Poll::Ready(()); - } - - // There is already an entry in the list of blocked tasks. Just - // reset the waker if it was removed. - let opt_waker = unsafe { blocked.get(key) }; - if opt_waker.is_none() { - let w = cx.waker().clone(); - *opt_waker = Some(w); - } - - Poll::Pending + self.poll_would_block(cx) } } + Some(_) => self.poll_blocked(cx), } } }