From f47063b371f5f1890c69857433818666796c2381 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 21 Oct 2017 14:21:15 +0200 Subject: [PATCH 1/5] Replace Scope with Guard --- Cargo.toml | 3 +- src/atomic.rs | 275 +++++++++++-------------- src/collector.rs | 242 +++++++++++----------- src/default.rs | 21 +- src/epoch.rs | 147 +++++++++----- src/garbage.rs | 4 +- src/guard.rs | 271 +++++++++++++++++++++++++ src/internal.rs | 505 +++++++++++++++++++++++++++------------------- src/lib.rs | 16 +- src/scope.rs | 101 ---------- src/sync/list.rs | 236 ---------------------- src/sync/mod.rs | 1 - src/sync/queue.rs | 93 ++++----- 13 files changed, 990 insertions(+), 925 deletions(-) create mode 100644 src/guard.rs delete mode 100644 src/scope.rs delete mode 100644 src/sync/list.rs diff --git a/Cargo.toml b/Cargo.toml index 27e608e..d1d29d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,10 +14,9 @@ nightly = [] strict_gc = [] [dependencies] -scopeguard = "0.3" -lazy_static = "0.2" arrayvec = "0.4" crossbeam-utils = "0.1" +lazy_static = "0.2" [dev-dependencies] rand = "0.3" diff --git a/src/atomic.rs b/src/atomic.rs index dd583c2..13dab5d 100644 --- a/src/atomic.rs +++ b/src/atomic.rs @@ -7,7 +7,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use scope::Scope; +use guard::Guard; /// Given ordering for the success case in a compare-exchange operation, returns the strongest /// appropriate ordering for the failure case. @@ -101,9 +101,9 @@ fn decompose_data(data: usize) -> (*mut T, usize) { /// least significant bits of the address. More precisely, a tag should be less than `(1 << /// mem::align_of::().trailing_zeros())`. /// -/// Any method that loads the pointer must be passed a reference to a [`Scope`]. +/// Any method that loads the pointer must be passed a reference to a [`Guard`]. /// -/// [`Scope`]: struct.Scope.html +/// [`Guard`]: struct.Guard.html pub struct Atomic { data: AtomicUsize, _marker: PhantomData<*mut T>, @@ -224,11 +224,10 @@ impl Atomic { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::new(1234); - /// epoch::pin(|scope| { - /// let p = a.load(SeqCst, scope); - /// }); + /// let guard = &epoch::pin(); + /// let p = a.load(SeqCst, guard); /// ``` - pub fn load<'scope>(&self, ord: Ordering, _: &'scope Scope) -> Ptr<'scope, T> { + pub fn load<'g>(&self, ord: Ordering, _: &'g Guard) -> Ptr<'g, T> { Ptr::from_data(self.data.load(ord)) } @@ -288,11 +287,10 @@ impl Atomic { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::new(1234); - /// epoch::pin(|scope| { - /// let p = a.swap(Ptr::null(), SeqCst, scope); - /// }); + /// let guard = &epoch::pin(); + /// let p = a.swap(Ptr::null(), SeqCst, guard); /// ``` - pub fn swap<'scope>(&self, new: Ptr, ord: Ordering, _: &'scope Scope) -> Ptr<'scope, T> { + pub fn swap<'g>(&self, new: Ptr, ord: Ordering, _: &'g Guard) -> Ptr<'g, T> { Ptr::from_data(self.data.swap(new.data, ord)) } @@ -314,27 +312,23 @@ impl Atomic { /// /// let a = Atomic::new(1234); /// - /// epoch::pin(|scope| { - /// let mut curr = a.load(SeqCst, scope); - /// let res = a.compare_and_set(curr, Ptr::null(), SeqCst, scope); - /// }); + /// let guard = &epoch::pin(); + /// let mut curr = a.load(SeqCst, guard); + /// let res = a.compare_and_set(curr, Ptr::null(), SeqCst, guard); /// ``` - pub fn compare_and_set<'scope, O>( + pub fn compare_and_set<'g, O>( &self, current: Ptr, new: Ptr, ord: O, - _: &'scope Scope, - ) -> Result<(), Ptr<'scope, T>> + _: &'g Guard, + ) -> Result<(), Ptr<'g, T>> where O: CompareAndSetOrdering, { - match self.data.compare_exchange( - current.data, - new.data, - ord.success(), - ord.failure(), - ) { + match self.data + .compare_exchange(current.data, new.data, ord.success(), ord.failure()) + { Ok(_) => Ok(()), Err(previous) => Err(Ptr::from_data(previous)), } @@ -361,32 +355,28 @@ impl Atomic { /// /// let a = Atomic::new(1234); /// - /// epoch::pin(|scope| { - /// let mut curr = a.load(SeqCst, scope); - /// loop { - /// match a.compare_and_set_weak(curr, Ptr::null(), SeqCst, scope) { - /// Ok(()) => break, - /// Err(c) => curr = c, - /// } + /// let guard = &epoch::pin(); + /// let mut curr = a.load(SeqCst, guard); + /// loop { + /// match a.compare_and_set_weak(curr, Ptr::null(), SeqCst, guard) { + /// Ok(()) => break, + /// Err(c) => curr = c, /// } - /// }); + /// } /// ``` - pub fn compare_and_set_weak<'scope, O>( + pub fn compare_and_set_weak<'g, O>( &self, current: Ptr, new: Ptr, ord: O, - _: &'scope Scope, - ) -> Result<(), Ptr<'scope, T>> + _: &'g Guard, + ) -> Result<(), Ptr<'g, T>> where O: CompareAndSetOrdering, { - match self.data.compare_exchange_weak( - current.data, - new.data, - ord.success(), - ord.failure(), - ) { + match self.data + .compare_exchange_weak(current.data, new.data, ord.success(), ord.failure()) + { Ok(_) => Ok(()), Err(previous) => Err(Ptr::from_data(previous)), } @@ -411,27 +401,23 @@ impl Atomic { /// /// let a = Atomic::new(1234); /// - /// epoch::pin(|scope| { - /// let mut curr = a.load(SeqCst, scope); - /// let res = a.compare_and_set_owned(curr, Owned::new(5678), SeqCst, scope); - /// }); + /// let guard = &epoch::pin(); + /// let mut curr = a.load(SeqCst, guard); + /// let res = a.compare_and_set_owned(curr, Owned::new(5678), SeqCst, guard); /// ``` - pub fn compare_and_set_owned<'scope, O>( + pub fn compare_and_set_owned<'g, O>( &self, current: Ptr, new: Owned, ord: O, - _: &'scope Scope, - ) -> Result, (Ptr<'scope, T>, Owned)> + _: &'g Guard, + ) -> Result, (Ptr<'g, T>, Owned)> where O: CompareAndSetOrdering, { - match self.data.compare_exchange( - current.data, - new.data, - ord.success(), - ord.failure(), - ) { + match self.data + .compare_exchange(current.data, new.data, ord.success(), ord.failure()) + { Ok(_) => { let data = new.data; mem::forget(new); @@ -463,39 +449,35 @@ impl Atomic { /// /// let a = Atomic::new(1234); /// - /// epoch::pin(|scope| { - /// let mut new = Owned::new(5678); - /// let mut ptr = a.load(SeqCst, scope); - /// loop { - /// match a.compare_and_set_weak_owned(ptr, new, SeqCst, scope) { - /// Ok(p) => { - /// ptr = p; - /// break; - /// } - /// Err((p, n)) => { - /// ptr = p; - /// new = n; - /// } + /// let guard = &epoch::pin(); + /// let mut new = Owned::new(5678); + /// let mut ptr = a.load(SeqCst, guard); + /// loop { + /// match a.compare_and_set_weak_owned(ptr, new, SeqCst, guard) { + /// Ok(p) => { + /// ptr = p; + /// break; + /// } + /// Err((p, n)) => { + /// ptr = p; + /// new = n; /// } /// } - /// }); + /// } /// ``` - pub fn compare_and_set_weak_owned<'scope, O>( + pub fn compare_and_set_weak_owned<'g, O>( &self, current: Ptr, new: Owned, ord: O, - _: &'scope Scope, - ) -> Result, (Ptr<'scope, T>, Owned)> + _: &'g Guard, + ) -> Result, (Ptr<'g, T>, Owned)> where O: CompareAndSetOrdering, { - match self.data.compare_exchange_weak( - current.data, - new.data, - ord.success(), - ord.failure(), - ) { + match self.data + .compare_exchange_weak(current.data, new.data, ord.success(), ord.failure()) + { Ok(_) => { let data = new.data; mem::forget(new); @@ -522,12 +504,11 @@ impl Atomic { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::::from_ptr(Ptr::null().with_tag(3)); - /// epoch::pin(|scope| { - /// assert_eq!(a.fetch_and(2, SeqCst, scope).tag(), 3); - /// assert_eq!(a.load(SeqCst, scope).tag(), 2); - /// }); + /// let guard = &epoch::pin(); + /// assert_eq!(a.fetch_and(2, SeqCst, guard).tag(), 3); + /// assert_eq!(a.load(SeqCst, guard).tag(), 2); /// ``` - pub fn fetch_and<'scope>(&self, val: usize, ord: Ordering, _: &'scope Scope) -> Ptr<'scope, T> { + pub fn fetch_and<'g>(&self, val: usize, ord: Ordering, _: &'g Guard) -> Ptr<'g, T> { Ptr::from_data(self.data.fetch_and(val | !low_bits::(), ord)) } @@ -548,12 +529,11 @@ impl Atomic { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::::from_ptr(Ptr::null().with_tag(1)); - /// epoch::pin(|scope| { - /// assert_eq!(a.fetch_or(2, SeqCst, scope).tag(), 1); - /// assert_eq!(a.load(SeqCst, scope).tag(), 3); - /// }); + /// let guard = &epoch::pin(); + /// assert_eq!(a.fetch_or(2, SeqCst, guard).tag(), 1); + /// assert_eq!(a.load(SeqCst, guard).tag(), 3); /// ``` - pub fn fetch_or<'scope>(&self, val: usize, ord: Ordering, _: &'scope Scope) -> Ptr<'scope, T> { + pub fn fetch_or<'g>(&self, val: usize, ord: Ordering, _: &'g Guard) -> Ptr<'g, T> { Ptr::from_data(self.data.fetch_or(val & low_bits::(), ord)) } @@ -574,12 +554,11 @@ impl Atomic { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::::from_ptr(Ptr::null().with_tag(1)); - /// epoch::pin(|scope| { - /// assert_eq!(a.fetch_xor(3, SeqCst, scope).tag(), 1); - /// assert_eq!(a.load(SeqCst, scope).tag(), 2); - /// }); + /// let guard = &epoch::pin(); + /// assert_eq!(a.fetch_xor(3, SeqCst, guard).tag(), 1); + /// assert_eq!(a.load(SeqCst, guard).tag(), 2); /// ``` - pub fn fetch_xor<'scope>(&self, val: usize, ord: Ordering, _: &'scope Scope) -> Ptr<'scope, T> { + pub fn fetch_xor<'g>(&self, val: usize, ord: Ordering, _: &'g Guard) -> Ptr<'g, T> { Ptr::from_data(self.data.fetch_xor(val & low_bits::(), ord)) } } @@ -639,7 +618,7 @@ impl From> for Atomic { } } -impl<'scope, T> From> for Atomic { +impl<'g, T> From> for Atomic { fn from(ptr: Ptr) -> Self { Atomic::from_ptr(ptr) } @@ -725,13 +704,12 @@ impl Owned { /// use crossbeam_epoch::{self as epoch, Owned}; /// /// let o = Owned::new(1234); - /// epoch::pin(|scope| { - /// let p = o.into_ptr(scope); - /// }); + /// let guard = &epoch::pin(); + /// let p = o.into_ptr(guard); /// ``` /// /// [`Ptr`]: struct.Ptr.html - pub fn into_ptr<'scope>(self, _: &'scope Scope) -> Ptr<'scope, T> { + pub fn into_ptr<'g>(self, _: &'g Guard) -> Ptr<'g, T> { let data = self.data; mem::forget(self); Ptr::from_data(data) @@ -868,18 +846,18 @@ impl AsMut for Owned { /// A pointer to an object protected by the epoch GC. /// -/// The pointer is valid for use only within `'scope`. +/// The pointer is valid for use only during the lifetime `'g`. /// /// The pointer must be properly aligned. Since it is aligned, a tag can be stored into the unused /// least significant bits of the address. -pub struct Ptr<'scope, T: 'scope> { +pub struct Ptr<'g, T: 'g> { data: usize, - _marker: PhantomData<(&'scope (), *const T)>, + _marker: PhantomData<(&'g (), *const T)>, } -unsafe impl<'scope, T: Send> Send for Ptr<'scope, T> {} +unsafe impl<'g, T: Send> Send for Ptr<'g, T> {} -impl<'scope, T> Clone for Ptr<'scope, T> { +impl<'g, T> Clone for Ptr<'g, T> { fn clone(&self) -> Self { Ptr { data: self.data, @@ -888,9 +866,9 @@ impl<'scope, T> Clone for Ptr<'scope, T> { } } -impl<'scope, T> Copy for Ptr<'scope, T> {} +impl<'g, T> Copy for Ptr<'g, T> {} -impl<'scope, T> Ptr<'scope, T> { +impl<'g, T> Ptr<'g, T> { /// Returns a new pointer pointing to the tagged pointer `data`. fn from_data(data: usize) -> Self { Ptr { @@ -947,11 +925,10 @@ impl<'scope, T> Ptr<'scope, T> { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::null(); - /// epoch::pin(|scope| { - /// assert!(a.load(SeqCst, scope).is_null()); - /// a.store_owned(Owned::new(1234), SeqCst); - /// assert!(!a.load(SeqCst, scope).is_null()); - /// }); + /// let guard = &epoch::pin(); + /// assert!(a.load(SeqCst, guard).is_null()); + /// a.store_owned(Owned::new(1234), SeqCst); + /// assert!(!a.load(SeqCst, guard).is_null()); /// ``` pub fn is_null(&self) -> bool { self.as_raw().is_null() @@ -969,10 +946,9 @@ impl<'scope, T> Ptr<'scope, T> { /// let raw = &*o as *const _; /// let a = Atomic::from_owned(o); /// - /// epoch::pin(|scope| { - /// let p = a.load(SeqCst, scope); - /// assert_eq!(p.as_raw(), raw); - /// }); + /// let guard = &epoch::pin(); + /// let p = a.load(SeqCst, guard); + /// assert_eq!(p.as_raw(), raw); /// ``` pub fn as_raw(&self) -> *const T { let (raw, _) = decompose_data::(self.data); @@ -981,7 +957,7 @@ impl<'scope, T> Ptr<'scope, T> { /// Dereferences the pointer. /// - /// Returns a reference to the pointee that is valid in `'scope`. + /// Returns a reference to the pointee that is valid during the lifetime `'g`. /// /// # Safety /// @@ -991,7 +967,7 @@ impl<'scope, T> Ptr<'scope, T> { /// For example, consider the following scenario: /// /// 1. A thread creates a new object: `a.store_owned(Owned::new(10), Relaxed)` - /// 2. Another thread reads it: `*a.load(Relaxed, scope).as_ref().unwrap()` + /// 2. Another thread reads it: `*a.load(Relaxed, guard).as_ref().unwrap()` /// /// The problem is that relaxed orderings don't synchronize initialization of the object with /// the read from the second thread. This is a data race. A possible solution would be to use @@ -1004,14 +980,13 @@ impl<'scope, T> Ptr<'scope, T> { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::new(1234); - /// epoch::pin(|scope| { - /// let p = a.load(SeqCst, scope); - /// unsafe { - /// assert_eq!(p.deref(), &1234); - /// } - /// }); + /// let guard = &epoch::pin(); + /// let p = a.load(SeqCst, guard); + /// unsafe { + /// assert_eq!(p.deref(), &1234); + /// } /// ``` - pub unsafe fn deref(&self) -> &'scope T { + pub unsafe fn deref(&self) -> &'g T { &*self.as_raw() } @@ -1027,7 +1002,7 @@ impl<'scope, T> Ptr<'scope, T> { /// For example, consider the following scenario: /// /// 1. A thread creates a new object: `a.store_owned(Owned::new(10), Relaxed)` - /// 2. Another thread reads it: `*a.load(Relaxed, scope).as_ref().unwrap()` + /// 2. Another thread reads it: `*a.load(Relaxed, guard).as_ref().unwrap()` /// /// The problem is that relaxed orderings don't synchronize initialization of the object with /// the read from the second thread. This is a data race. A possible solution would be to use @@ -1040,14 +1015,13 @@ impl<'scope, T> Ptr<'scope, T> { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::new(1234); - /// epoch::pin(|scope| { - /// let p = a.load(SeqCst, scope); - /// unsafe { - /// assert_eq!(p.as_ref(), Some(&1234)); - /// } - /// }); + /// let guard = &epoch::pin(); + /// let p = a.load(SeqCst, guard); + /// unsafe { + /// assert_eq!(p.as_ref(), Some(&1234)); + /// } /// ``` - pub unsafe fn as_ref(&self) -> Option<&'scope T> { + pub unsafe fn as_ref(&self) -> Option<&'g T> { self.as_raw().as_ref() } @@ -1066,10 +1040,9 @@ impl<'scope, T> Ptr<'scope, T> { /// /// let a = Atomic::new(1234); /// unsafe { - /// epoch::unprotected(|scope| { - /// let p = a.load(SeqCst, scope); - /// drop(p.into_owned()); - /// }); + /// let guard = &epoch::unprotected(); + /// let p = a.load(SeqCst, guard); + /// drop(p.into_owned()); /// } /// ``` pub unsafe fn into_owned(self) -> Owned { @@ -1085,10 +1058,9 @@ impl<'scope, T> Ptr<'scope, T> { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::from_owned(Owned::new(0u64).with_tag(5)); - /// epoch::pin(|scope| { - /// let p = a.load(SeqCst, scope); - /// assert_eq!(p.tag(), 5); - /// }); + /// let guard = &epoch::pin(); + /// let p = a.load(SeqCst, guard); + /// assert_eq!(p.tag(), 5); /// ``` pub fn tag(&self) -> usize { let (_, tag) = decompose_data::(self.data); @@ -1105,41 +1077,40 @@ impl<'scope, T> Ptr<'scope, T> { /// use std::sync::atomic::Ordering::SeqCst; /// /// let a = Atomic::new(0u64); - /// epoch::pin(|scope| { - /// let p1 = a.load(SeqCst, scope); - /// let p2 = p1.with_tag(5); + /// let guard = &epoch::pin(); + /// let p1 = a.load(SeqCst, guard); + /// let p2 = p1.with_tag(5); /// - /// assert_eq!(p1.tag(), 0); - /// assert_eq!(p2.tag(), 5); - /// assert_eq!(p1.as_raw(), p2.as_raw()); - /// }); + /// assert_eq!(p1.tag(), 0); + /// assert_eq!(p2.tag(), 5); + /// assert_eq!(p1.as_raw(), p2.as_raw()); /// ``` pub fn with_tag(&self, tag: usize) -> Self { Self::from_data(data_with_tag::(self.data, tag)) } } -impl<'scope, T> PartialEq> for Ptr<'scope, T> { +impl<'g, T> PartialEq> for Ptr<'g, T> { fn eq(&self, other: &Self) -> bool { self.data == other.data } } -impl<'scope, T> Eq for Ptr<'scope, T> {} +impl<'g, T> Eq for Ptr<'g, T> {} -impl<'scope, T> PartialOrd> for Ptr<'scope, T> { +impl<'g, T> PartialOrd> for Ptr<'g, T> { fn partial_cmp(&self, other: &Self) -> Option { self.data.partial_cmp(&other.data) } } -impl<'scope, T> Ord for Ptr<'scope, T> { +impl<'g, T> Ord for Ptr<'g, T> { fn cmp(&self, other: &Self) -> cmp::Ordering { self.data.cmp(&other.data) } } -impl<'scope, T> fmt::Debug for Ptr<'scope, T> { +impl<'g, T> fmt::Debug for Ptr<'g, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let (raw, tag) = decompose_data::(self.data); @@ -1150,13 +1121,13 @@ impl<'scope, T> fmt::Debug for Ptr<'scope, T> { } } -impl<'scope, T> fmt::Pointer for Ptr<'scope, T> { +impl<'g, T> fmt::Pointer for Ptr<'g, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fmt::Pointer::fmt(&self.as_raw(), f) } } -impl<'scope, T> Default for Ptr<'scope, T> { +impl<'g, T> Default for Ptr<'g, T> { fn default() -> Self { Ptr::null() } diff --git a/src/collector.rs b/src/collector.rs index 873932b..53a9995 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -10,79 +10,91 @@ /// let handle = collector.handle(); /// drop(collector); // `handle` still works after dropping `collector` /// -/// handle.pin(|scope| { -/// scope.flush(); -/// }); +/// handle.pin().flush(); /// ``` use std::sync::Arc; + use internal::{Global, Local}; -use scope::Scope; +use guard::Guard; /// An epoch-based garbage collector. -pub struct Collector(Arc); - -/// A handle to a garbage collector. -pub struct Handle { +pub struct Collector { global: Arc, - local: Local, } +unsafe impl Send for Collector {} +unsafe impl Sync for Collector {} + impl Collector { /// Creates a new collector. pub fn new() -> Self { - Self { 0: Arc::new(Global::new()) } + Collector { + global: Arc::new(Global::new()), + } } /// Creates a new handle for the collector. - #[inline] pub fn handle(&self) -> Handle { - Handle::new(self.0.clone()) + Local::register(self.global.clone()) } } -impl Handle { - fn new(global: Arc) -> Self { - let local = Local::new(&global); - Self { global, local } +impl Clone for Collector { + /// Creates another reference to the same garbage collector. + fn clone(&self) -> Self { + Collector { + global: self.global.clone(), + } } +} + +/// A handle to a garbage collector. +pub struct Handle { + pub(crate) local: *const Local, +} - /// Pin the current handle. +impl Handle { + /// Pins the handle. #[inline] - pub fn pin(&self, f: F) -> R - where - F: FnOnce(&Scope) -> R, - { - unsafe { self.local.pin(&self.global, f) } + pub fn pin(&self) -> Guard { + unsafe { (*self.local).pin() } } - /// Check if the current handle is pinned. + /// Returns `true` if the handle is pinned. #[inline] pub fn is_pinned(&self) -> bool { - self.local.is_pinned() - } -} - -impl Clone for Handle { - fn clone(&self) -> Self { - Self::new(self.global.clone()) + unsafe { (*self.local).is_pinned() } } } unsafe impl Send for Handle {} impl Drop for Handle { + #[inline] fn drop(&mut self) { - unsafe { self.local.unregister(&self.global) } + unsafe { + Local::release_handle(&*self.local); + } } } +impl Clone for Handle { + #[inline] + fn clone(&self) -> Self { + unsafe { + Local::acquire_handle(&*self.local); + } + Handle { local: self.local } + } +} #[cfg(test)] mod tests { use std::mem; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; use std::sync::atomic::Ordering; + use crossbeam_utils::scoped; use {Collector, Owned}; @@ -96,13 +108,15 @@ mod tests { drop(collector); assert!(!handle.is_pinned()); - handle.pin(|_| { + { + let _guard = &handle.pin(); assert!(handle.is_pinned()); - handle.pin(|_| { + { + let _guard = &handle.pin(); assert!(handle.is_pinned()); - }); + } assert!(handle.is_pinned()); - }); + } assert!(!handle.is_pinned()); } @@ -113,16 +127,17 @@ mod tests { drop(collector); for _ in 0..100 { - handle.pin(|scope| unsafe { - let a = Owned::new(7).into_ptr(scope); - scope.defer(move || a.into_owned()); + let guard = &handle.pin(); + unsafe { + let a = Owned::new(7).into_ptr(guard); + guard.defer(move || a.into_owned()); - assert!(!(*scope.bag).is_empty()); + assert!(!(*(*guard.local).bag.get()).is_empty()); - while !(*scope.bag).is_empty() { - scope.flush(); + while !(*(*guard.local).bag.get()).is_empty() { + guard.flush(); } - }); + } } } @@ -132,13 +147,14 @@ mod tests { let handle = collector.handle(); drop(collector); - handle.pin(|scope| unsafe { + let guard = &handle.pin(); + unsafe { for _ in 0..10 { - let a = Owned::new(7).into_ptr(scope); - scope.defer(move || a.into_owned()); + let a = Owned::new(7).into_ptr(guard); + guard.defer(move || a.into_owned()); } - assert!(!(*scope.bag).is_empty()); - }); + assert!(!(*(*guard.local).bag.get()).is_empty()); + } } #[test] @@ -151,13 +167,13 @@ mod tests { scope.spawn(|| { let handle = collector.handle(); for _ in 0..500_000 { - handle.pin(|scope| { - let before = collector.0.get_epoch(); - collector.0.collect(scope); - let after = collector.0.get_epoch(); + let guard = &handle.pin(); + + let before = collector.global.epoch.load(Ordering::Relaxed); + collector.global.collect(guard); + let after = collector.global.epoch.load(Ordering::Relaxed); - assert!(after.wrapping_sub(before) <= 2); - }); + assert!(after.distance(before) <= 2); } }) }) @@ -178,16 +194,17 @@ mod tests { let collector = Collector::new(); let handle = collector.handle(); - handle.pin(|scope| unsafe { + unsafe { + let guard = &handle.pin(); for _ in 0..COUNT { - let a = Owned::new(7i32).into_ptr(scope); - scope.defer(move || { + let a = Owned::new(7i32).into_ptr(guard); + guard.defer(move || { drop(a.into_owned()); DESTROYS.fetch_add(1, Ordering::Relaxed); }); } - scope.flush(); - }); + guard.flush(); + } let mut last = 0; @@ -196,7 +213,8 @@ mod tests { assert!(curr - last <= 1024); last = curr; - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert!(DESTROYS.load(Ordering::Relaxed) == 100_000); } @@ -209,25 +227,27 @@ mod tests { let collector = Collector::new(); let handle = collector.handle(); - handle.pin(|scope| unsafe { + unsafe { + let guard = &handle.pin(); for _ in 0..COUNT { - let a = Owned::new(7i32).into_ptr(scope); - scope.defer(move || { + let a = Owned::new(7i32).into_ptr(guard); + guard.defer(move || { drop(a.into_owned()); DESTROYS.fetch_add(1, Ordering::Relaxed); }); } - }); + } for _ in 0..100_000 { - handle.pin(|scope| { collector.0.collect(scope); }); + collector.global.collect(&handle.pin()); } assert!(DESTROYS.load(Ordering::Relaxed) < COUNT); - handle.pin(|scope| scope.flush()); + handle.pin().flush(); while DESTROYS.load(Ordering::Relaxed) < COUNT { - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); } @@ -248,16 +268,19 @@ mod tests { let collector = Collector::new(); let handle = collector.handle(); - handle.pin(|scope| unsafe { + unsafe { + let guard = &handle.pin(); + for _ in 0..COUNT { - let a = Owned::new(Elem(7i32)).into_ptr(scope); - scope.defer(move || a.into_owned()); + let a = Owned::new(Elem(7i32)).into_ptr(guard); + guard.defer(move || a.into_owned()); } - scope.flush(); - }); + guard.flush(); + } while DROPS.load(Ordering::Relaxed) < COUNT { - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); } @@ -270,19 +293,22 @@ mod tests { let collector = Collector::new(); let handle = collector.handle(); - handle.pin(|scope| unsafe { + unsafe { + let guard = &handle.pin(); + for _ in 0..COUNT { - let a = Owned::new(7i32).into_ptr(scope); - scope.defer(move || { + let a = Owned::new(7i32).into_ptr(guard); + guard.defer(move || { drop(a.into_owned()); DESTROYS.fetch_add(1, Ordering::Relaxed); }); } - scope.flush(); - }); + guard.flush(); + } while DESTROYS.load(Ordering::Relaxed) < COUNT { - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); } @@ -303,19 +329,22 @@ mod tests { let collector = Collector::new(); let handle = collector.handle(); - handle.pin(|scope| unsafe { + unsafe { + let guard = &handle.pin(); + let mut v = Vec::with_capacity(COUNT); for i in 0..COUNT { v.push(Elem(i as i32)); } - let a = Owned::new(v).into_ptr(scope); - scope.defer(move || a.into_owned()); - scope.flush(); - }); + let a = Owned::new(v).into_ptr(guard); + guard.defer(move || a.into_owned()); + guard.flush(); + } while DROPS.load(Ordering::Relaxed) < COUNT { - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert_eq!(DROPS.load(Ordering::Relaxed), COUNT); } @@ -328,7 +357,9 @@ mod tests { let collector = Collector::new(); let handle = collector.handle(); - handle.pin(|scope| unsafe { + unsafe { + let guard = &handle.pin(); + let mut v = Vec::with_capacity(COUNT); for i in 0..COUNT { v.push(i as i32); @@ -336,17 +367,18 @@ mod tests { let ptr = v.as_mut_ptr() as usize; let len = v.len(); - scope.defer(move || { + guard.defer(move || { drop(Vec::from_raw_parts(ptr as *const u8 as *mut u8, len, len)); DESTROYS.fetch_add(len, Ordering::Relaxed); }); - scope.flush(); + guard.flush(); mem::forget(v); - }); + } while DESTROYS.load(Ordering::Relaxed) < COUNT { - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT); } @@ -373,10 +405,11 @@ mod tests { scope.spawn(|| { let handle = collector.handle(); for _ in 0..COUNT { - handle.pin(|scope| unsafe { - let a = Owned::new(Elem(7i32)).into_ptr(scope); - scope.defer(move || a.into_owned()); - }); + let guard = &handle.pin(); + unsafe { + let a = Owned::new(Elem(7i32)).into_ptr(guard); + guard.defer(move || a.into_owned()); + } } }) }) @@ -389,28 +422,9 @@ mod tests { let handle = collector.handle(); while DROPS.load(Ordering::Relaxed) < COUNT * THREADS { - handle.pin(|scope| collector.0.collect(scope)); + let guard = &handle.pin(); + collector.global.collect(guard); } assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS); } - - #[test] - fn send_handle() { - let ref collector = Collector::new(); - let handle_global = collector.handle(); - - let threads = (0..NUM_THREADS) - .map(|_| { - scoped::scope(|scope| { - let handle = handle_global.clone(); - scope.spawn(move || handle.pin(|scope| scope.flush())) - }) - }) - .collect::>(); - drop(collector); - - for t in threads { - t.join(); - } - } } diff --git a/src/default.rs b/src/default.rs index be2f1b0..e45e253 100644 --- a/src/default.rs +++ b/src/default.rs @@ -5,7 +5,7 @@ //! destructed on thread exit, which in turn unregisters the thread. use collector::{Collector, Handle}; -use scope::Scope; +use guard::Guard; lazy_static! { /// The global data for the default garbage collector. @@ -17,19 +17,24 @@ thread_local! { static HANDLE: Handle = COLLECTOR.handle(); } -/// Pin the current thread. -pub fn pin(f: F) -> R -where - F: FnOnce(&Scope) -> R, -{ +/// Pins the current thread. +#[inline] +pub fn pin() -> Guard { // FIXME(jeehoonkang): thread-local storage may be destructed at the time `pin()` is called. For // that case, we should use `HANDLE.try_with()` instead. - HANDLE.with(|handle| handle.pin(f)) + HANDLE.with(|handle| handle.pin()) } -/// Check if the current thread is pinned. +/// Returns `true` if the current thread is pinned. +#[inline] pub fn is_pinned() -> bool { // FIXME(jeehoonkang): thread-local storage may be destructed at the time `pin()` is called. For // that case, we should use `HANDLE.try_with()` instead. HANDLE.with(|handle| handle.is_pinned()) } + +/// Returns the default handle associated with the current thread. +#[inline] +pub fn default_handle() -> Handle { + HANDLE.with(|handle| handle.clone()) +} diff --git a/src/epoch.rs b/src/epoch.rs index 0ea2792..9100e1c 100644 --- a/src/epoch.rs +++ b/src/epoch.rs @@ -7,77 +7,116 @@ //! If an object became garbage in some epoch, then we can be sure that after two advancements no //! participant will hold a reference to it. That is the crux of safe memory reclamation. -use std::ops::Deref; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Relaxed, Acquire, Release, SeqCst}; +use std::cmp; +use std::sync::atomic::{AtomicUsize, Ordering}; -use internal::LocalEpoch; -use scope::Scope; -use sync::list::{List, IterError}; -use crossbeam_utils::cache_padded::CachePadded; - -/// The global epoch is a (cache-padded) integer. -#[derive(Default, Debug)] +/// An epoch that can be marked as pinned or unpinned. +/// +/// Internally, the epoch is represented as an integer that wraps around at some unspecified point +/// and a flag that represents whether it is pinned or unpinned. +#[derive(Copy, Clone, Default, Debug, Eq, PartialEq)] pub struct Epoch { - epoch: CachePadded, + /// The least significant bit is set if pinned. The rest of the bits hold the epoch. + data: usize, } impl Epoch { - pub fn new() -> Self { + /// Returns the starting epoch in unpinned state. + #[inline] + pub fn starting() -> Self { Self::default() } - /// Attempts to advance the global epoch. - /// - /// The global epoch can advance only if all currently pinned participants have been pinned in - /// the current epoch. - /// - /// Returns the current global epoch. + /// Returns the number of steps between two epochs. /// - /// `try_advance()` is annotated `#[cold]` because it is rarely called. - #[cold] - pub fn try_advance(&self, registries: &List, scope: &Scope) -> usize { - let epoch = self.epoch.load(Relaxed); - ::std::sync::atomic::fence(SeqCst); + /// An epoch is internally represented as an integer that wraps around at some unspecified + /// point. The returned distance between two epochs is the minimum number of steps required to + /// go from `self` to `other` or go from `other` to `self`. + #[inline] + pub fn distance(&self, other: Self) -> usize { + let (e1, _) = self.decompose(); + let (e2, _) = other.decompose(); + cmp::min(e1.wrapping_sub(e2), e2.wrapping_sub(e1)) + } - // Traverse the linked list of participant registries. - for participant in registries.iter(scope) { - match participant { - Err(IterError::LostRace) => { - // We leave the job to the participant that won the race, which continues to - // iterate the registries and tries to advance to epoch. - return epoch; - } - Ok(local_epoch) => { - let local_epoch = local_epoch.get(); - let (participant_is_pinned, participant_epoch) = local_epoch.get_state(); + /// Returns `true` if the epoch is marked as pinned. + #[inline] + pub fn is_pinned(&self) -> bool { + self.decompose().1 + } - // If the participant was pinned in a different epoch, we cannot advance the - // global epoch just yet. - if participant_is_pinned && participant_epoch != epoch { - return epoch; - } - } - } + /// Returns the same epoch, but marked as pinned. + #[inline] + pub fn pinned(&self) -> Epoch { + Epoch { + data: self.data | 1, } - ::std::sync::atomic::fence(Acquire); + } - // All pinned participants were pinned in the current global epoch. Try advancing the - // epoch. We increment by 2 and simply wrap around on overflow. - let epoch_new = epoch.wrapping_add(2); - self.epoch.store(epoch_new, Release); - epoch_new + /// Returns the same epoch, but marked as unpinned. + #[inline] + pub fn unpinned(&self) -> Epoch { + Epoch { + data: self.data & !1, + } } -} -impl Deref for Epoch { - type Target = AtomicUsize; + /// Returns the successor epoch. + /// + /// The returned epoch will be marked as pinned only if the previous one was as well. + #[inline] + pub fn successor(&self) -> Epoch { + Epoch { + data: self.data.wrapping_add(2), + } + } - fn deref(&self) -> &Self::Target { - &self.epoch + /// Decomposes the internal data into the epoch and the pin state. + #[inline] + fn decompose(&self) -> (usize, bool) { + (self.data >> 1, (self.data & 1) == 1) } } +/// An atomic value that holds an `Epoch`. +#[derive(Default, Debug)] +pub struct AtomicEpoch { + /// Since `Epoch` is just a wrapper around `usize`, an `AtomicEpoch` is similarly represented + /// using an `AtomicUsize`. + data: AtomicUsize, +} + +impl AtomicEpoch { + /// Creates a new atomic epoch. + #[inline] + pub fn new(epoch: Epoch) -> Self { + let data = AtomicUsize::new(epoch.data); + AtomicEpoch { data } + } + + /// Loads a value from the atomic epoch. + #[inline] + pub fn load(&self, ord: Ordering) -> Epoch { + Epoch { + data: self.data.load(ord), + } + } -#[cfg(test)] -mod tests {} + /// Stores a value into the atomic epoch. + #[inline] + pub fn store(&self, epoch: Epoch, ord: Ordering) { + self.data.store(epoch.data, ord); + } + + /// Stores a value into the atomic epoch if the current value is the same as `current`. + /// + /// The return value is always the previous value. If it is equal to `current`, then the value + /// is updated. + /// + /// The `Ordering` argument describes the memory ordering of this operation. + #[inline] + pub fn compare_and_swap(&self, current: Epoch, new: Epoch, ord: Ordering) -> Epoch { + let data = self.data.compare_and_swap(current.data, new.data, ord); + Epoch { data } + } +} diff --git a/src/garbage.rs b/src/garbage.rs index dc7ae43..abc7641 100644 --- a/src/garbage.rs +++ b/src/garbage.rs @@ -51,7 +51,9 @@ impl fmt::Debug for Garbage { impl Garbage { /// Make a closure that will later be called. pub fn new(f: F) -> Self { - Garbage { func: Deferred::new(move || f()) } + Garbage { + func: Deferred::new(move || f()), + } } } diff --git a/src/guard.rs b/src/guard.rs new file mode 100644 index 0000000..a6df48b --- /dev/null +++ b/src/guard.rs @@ -0,0 +1,271 @@ +use std::ptr; + +use garbage::Garbage; +use internal::Local; + +/// A guard that keeps the current thread pinned. +/// +/// # Pinning +/// +/// The current thread is pinned by calling [`pin`], which returns a new guard: +/// +/// ``` +/// use crossbeam_epoch as epoch; +/// +/// // It is often convenient to prefix a call to `pin` with a `&` in order to create a reference. +/// // This is not really necessary, but makes passing references to the guard a bit easier. +/// let guard = &epoch::pin(); +/// ``` +/// +/// When a guard gets dropped, the current thread is automatically unpinned. +/// +/// # Pointers on the stack +/// +/// Having a guard allows us to create pointers on the stack to heap-allocated objects. +/// For example: +/// +/// ``` +/// use crossbeam_epoch::{self as epoch, Atomic, Owned}; +/// use std::sync::atomic::Ordering::SeqCst; +/// +/// // Create a heap-allocated number. +/// let a = Atomic::new(777); +/// +/// // Pin the current thread. +/// let guard = &epoch::pin(); +/// +/// // Load the heap-allocated object and create pointer `p` on the stack. +/// let p = a.load(SeqCst, guard); +/// +/// // Dereference the pointer and print the value: +/// if let Some(num) = unsafe { p.as_ref() } { +/// println!("The number is {}.", num); +/// } +/// ``` +/// +/// # Multiple guards +/// +/// Pinning is reentrant and it is perfectly legal to create multiple guards. In that case, the +/// thread will be pinned only when the first guard is created and unpinned when the last one is +/// dropped: +/// +/// ``` +/// use crossbeam_epoch as epoch; +/// +/// let guard1 = epoch::pin(); +/// let guard2 = epoch::pin(); +/// assert!(epoch::is_pinned()); +/// drop(guard1); +/// assert!(epoch::is_pinned()); +/// drop(guard2); +/// assert!(!epoch::is_pinned()); +/// ``` +/// +/// [`pin`]: fn.pin.html +pub struct Guard { + pub(crate) local: *const Local, +} + +impl Guard { + /// Stores a function so that it can be executed at some point after all currently pinned + /// threads get unpinned. + /// + /// This method first stores `f` into the thread-local (or handle-local) cache. If this cache + /// becomes full, some functions are moved into the global cache. At the same time, some + /// functions from both local and global caches may get executed in order to incrementally + /// clean up the caches as they fill up. + /// + /// There is no guarantee when exactly `f` will be executed. The only guarantee is that won't + /// until all currently pinned threads get unpinned. In theory, `f` might never be deallocated, + /// but the epoch-based garbage collection will make an effort to execute it reasonably soon. + /// + /// If this method is called from an [`unprotected`] guard, the function will simply be + /// executed immediately. + /// + /// # Safety + /// + /// The given function must not hold reference onto the stack. It is highly recommended that + /// the passed function is **always** marked with `move` in order to prevent accidental + /// borrows. + /// + /// ``` + /// use crossbeam_epoch as epoch; + /// + /// let guard = &epoch::pin(); + /// let message = "Hello!"; + /// unsafe { + /// // ALWAYS use `move` when sending a closure into `defef`. + /// guard.defer(move || { + /// println!("{}", message); + /// }); + /// } + /// ``` + /// + /// Apart from that, keep in mind that another thread may execute `f`, so anything accessed + /// by the closure must be `Send`. + /// + /// # Examples + /// + /// When a heap-allocated object in a data structure becomes unreachable, it has to be + /// deallocated. However, the current thread and other threads may be still holding references + /// on the stack to that same object. Therefore it cannot be deallocated before those + /// references get dropped. This method can defer deallocation until all those threads get + /// unpinned and consequently drop all their references on the stack. + /// + /// ```rust + /// use crossbeam_epoch::{self as epoch, Atomic, Owned}; + /// use std::sync::atomic::Ordering::SeqCst; + /// + /// let a = Atomic::new("foo"); + /// + /// // Now suppose that `a` is shared among multiple threads and concurrently + /// // accessed and modified... + /// + /// // Pin the current thread. + /// let guard = &epoch::pin(); + /// + /// // Steal the object currently stored in `a` and swap it with another one. + /// let p = a.swap(Owned::new("bar").into_ptr(guard), SeqCst, guard); + /// + /// if !p.is_null() { + /// // The object `p` is pointing to is now unreachable. + /// // Defer its deallocation until all currently pinned threads get unpinned. + /// unsafe { + /// // ALWAYS use `move` when sending a closure into `defer`. + /// guard.defer(move || { + /// println!("{} is now being deallocated.", p.deref()); + /// // Now we have unique access to the object pointed to by `p` and can turn it + /// // into an `Owned`. Dropping the `Owned` will deallocate the object. + /// drop(p.into_owned()); + /// }); + /// } + /// } + /// ``` + /// + /// [`unprotected`]: fn.unprotected.html + pub unsafe fn defer(&self, f: F) + where + F: FnOnce() -> R + Send + { + let garbage = Garbage::new(|| drop(f())); + + if let Some(local) = self.local.as_ref() { + local.defer(garbage, self); + } + } + + /// Clears up the thread-local cache of deferred functions by executing them or moving into the + /// global cache. + /// + /// Call this method after deferring execution of a function if you want to get it executed as + /// soon as possible. Flushing will make sure it is residing in in the global cache, so that + /// any thread has a chance of taking the function and executing it. + /// + /// If this method is called from an [`unprotected`] guard, it is a no-op (nothing happens). + /// + /// # Examples + /// + /// ``` + /// use crossbeam_epoch as epoch; + /// + /// let guard = &epoch::pin(); + /// unsafe { + /// guard.defer(move || { + /// println!("This better be printed as soon as possible!"); + /// }); + /// } + /// guard.flush(); + /// ``` + /// + /// [`unprotected`]: fn.unprotected.html + pub fn flush(&self) { + if let Some(local) = unsafe { self.local.as_ref() } { + local.flush(self); + } + } +} + +impl Drop for Guard { + #[inline] + fn drop(&mut self) { + if let Some(local) = unsafe { self.local.as_ref() } { + Local::unpin(local); + } + } +} + +/// Creates a dummy guard that doesn't really pin the current thread. +/// +/// This is a function for special uses only. It creates a guard that can be used for loading +/// [`Atomic`]s, but will not pin or unpin the current thread. Calling [`defer`] with a dummy guard +/// will simply execute the function immediately. +/// +/// # Safety +/// +/// Loading and dereferencing data from an [`Atomic`] using a dummy guard is safe only if the +/// [`Atomic`] is not being concurrently modified by other threads. +/// +/// # Examples +/// +/// ``` +/// use crossbeam_epoch as epoch; +/// +/// unsafe { +/// let guard = &epoch::unprotected(); +/// guard.defer(move || { +/// println!("This gets executed immediately."); +/// }); +/// } +/// ``` +/// +/// The most common use of this function is when constructing or destructing a data structure. +/// +/// For example, we can use a dummy guard in the destructor of a Treiber stack because at that +/// point no other thread could concurrently modify the [`Atomic`]s we are accessing: +/// +/// ``` +/// use crossbeam_epoch::{self as epoch, Atomic}; +/// use std::mem::ManuallyDrop; +/// use std::sync::atomic::Ordering::Relaxed; +/// +/// struct Stack { +/// head: Atomic>, +/// } +/// +/// struct Node { +/// data: ManuallyDrop, +/// next: Atomic>, +/// } +/// +/// impl Drop for Stack { +/// fn drop(&mut self) { +/// unsafe { +/// // Create a dummy guard. +/// let guard = &epoch::unprotected(); +/// +/// let mut node = self.head.load(Relaxed, guard); +/// +/// while let Some(n) = node.as_ref() { +/// let next = n.next.load(Relaxed, guard); +/// +/// // Take ownership of the node, then drop its data and deallocate it. +/// let mut o = node.into_owned(); +/// ManuallyDrop::drop(&mut o.data); +/// drop(o); +/// +/// node = next; +/// } +/// } +/// } +/// } +/// ``` +/// +/// Really pinning the current thread would only unnecessarily delay garbage collection and incur +/// some performance cost, so in cases like these `unprotected` is of great help. +/// +/// [`Atomic`]: struct.Atomic.html +/// [`defer`]: struct.Guard.html#method.defer +#[inline] +pub unsafe fn unprotected() -> Guard { + Guard { local: ptr::null() } +} diff --git a/src/internal.rs b/src/internal.rs index 363bbf9..28a8ed5 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -13,44 +13,36 @@ //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that //! aids in periodic global epoch advancement. //! -//! When a participant is pinned, a `Scope` is returned as a witness that the participant is pinned. -//! Scopes are necessary for performing atomic operations, and for freeing/dropping locations. -//! -//! # Example -//! -//! `Global` is the global data for a garbage collector, and `Local` is a participant of a garbage -//! collector. Use `Global` and `Local` when you want to embed a garbage collector in another -//! systems library, e.g. memory allocator or thread manager. -//! -//! ```ignore -//! let global = Global::new(); -//! let local = Local::new(&global); -//! unsafe { -//! local.pin(&global, |scope| { -//! scope.flush(); -//! }); -//! } -//! ``` +//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned. +//! Guards are necessary for performing atomic operations, and for freeing/dropping locations. use std::cell::{Cell, UnsafeCell}; -use std::cmp; -use std::sync::atomic::{AtomicUsize, Ordering}; -use scope::{Scope, unprotected}; -use garbage::Bag; -use epoch::Epoch; -use sync::list::{List, Node}; -use sync::queue::Queue; +use std::mem::{self, ManuallyDrop}; +use std::num::Wrapping; +use std::ptr; +use std::sync::Arc; +use std::sync::atomic; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; +use crossbeam_utils::cache_padded::CachePadded; + +use atomic::{Atomic, Owned}; +use collector::Handle; +use epoch::{AtomicEpoch, Epoch}; +use guard::{unprotected, Guard}; +use garbage::{Bag, Garbage}; +use sync::queue::Queue; /// The global data for a garbage collector. -#[derive(Debug)] pub struct Global { - /// The head pointer of the list of participant registries. - registries: List, - /// A reference to the global queue of garbages. - garbages: Queue<(usize, Bag)>, - /// A reference to the global epoch. - epoch: Epoch, + /// The head pointer of the list of `Local`s. + head: Atomic, + + /// The global queue of bags of deferred functions. + queue: Queue<(Epoch, Bag)>, + + /// The global epoch. + pub(crate) epoch: CachePadded, } impl Global { @@ -61,27 +53,23 @@ impl Global { #[inline] pub fn new() -> Self { Self { - registries: List::new(), - garbages: Queue::new(), - epoch: Epoch::new(), + head: Atomic::null(), + queue: Queue::new(), + epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), } } - /// Returns the global epoch. - #[inline] - pub fn get_epoch(&self) -> usize { - self.epoch.load(Ordering::Relaxed) - } + /// Pushes the bag into the global queue and replaces the bag with a new empty bag. + pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) { + let epoch = self.epoch.load(Relaxed); + let bag = mem::replace(bag, Bag::new()); - /// Pushes the bag onto the global queue and replaces the bag with a new empty bag. - pub fn push_bag(&self, bag: &mut Bag, scope: &Scope) { - let epoch = self.epoch.load(Ordering::Relaxed); - let bag = ::std::mem::replace(bag, Bag::new()); - ::std::sync::atomic::fence(Ordering::SeqCst); - self.garbages.push((epoch, bag), scope); + atomic::fence(SeqCst); + + self.queue.push((epoch, bag), guard); } - /// Collect several bags from the global garbage queue and destroy their objects. + /// Collects several bags from the global queue and executes deferred functions in them. /// /// Note: This may itself produce garbage and in turn allocate new bags. /// @@ -89,209 +77,322 @@ impl Global { /// path. In other words, we want the compiler to optimize branching for the case when /// `collect()` is not called. #[cold] - pub fn collect(&self, scope: &Scope) { - let epoch = self.epoch.try_advance(&self.registries, scope); + pub fn collect(&self, guard: &Guard) { + let global_epoch = self.try_advance(guard); - let condition = |bag: &(usize, Bag)| { + let condition = |item: &(Epoch, Bag)| { // A pinned participant can witness at most one epoch advancement. Therefore, any bag // that is within one epoch of the current one cannot be destroyed yet. - let diff = epoch.wrapping_sub(bag.0); - cmp::min(diff, 0usize.wrapping_sub(diff)) > 2 + global_epoch.distance(item.0) > 1 }; for _ in 0..Self::COLLECT_STEPS { - match self.garbages.try_pop_if(&condition, scope) { + match self.queue.try_pop_if(&condition, guard) { None => break, Some(bag) => drop(bag), } } } -} -// FIXME(stjepang): Registries are stored in a linked list because linked lists are fairly easy to -// implement in a lock-free manner. However, traversal is rather slow due to cache misses and data -// dependencies. We should experiment with other data structures as well. -/// Participant for garbage collection -#[derive(Debug)] -pub struct Local { - /// The local garbage objects that will be later freed. - bag: UnsafeCell, - /// This participant's entry in the local epoch list. It points to a node in `Global`, so it is - /// alive as far as the `Global` is alive. - local_epoch: *const Node, - /// Whether the participant is currently pinned. - is_pinned: Cell, - /// Total number of pinnings performed. - pin_count: Cell, -} + /// Attempts to advance the global epoch. + /// + /// The global epoch can advance only if all currently pinned participants have been pinned in + /// the current epoch. + /// + /// Returns the current global epoch. + /// + /// `try_advance()` is annotated `#[cold]` because it is rarely called. + #[cold] + pub fn try_advance(&self, guard: &Guard) -> Epoch { + let global_epoch = self.epoch.load(Relaxed); + atomic::fence(SeqCst); + + // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly + // easy to implement in a lock-free manner. However, traversal can be slow due to cache + // misses and data dependencies. We should experiment with other data structures as well. + let mut pred = &self.head; + let mut curr = pred.load(Acquire, guard); + + while let Some(c) = unsafe { curr.as_ref() } { + let succ = c.next.load(Acquire, guard); + + if succ.tag() == 1 { + // This thread has exited. Try unlinking it from the list. + let succ = succ.with_tag(0); + + if pred.compare_and_set(curr, succ, AcqRel, guard).is_err() { + // We lost the race to unlink the thread. Usually that means we should traverse + // the list again from the beginning, but since another thread trying to + // advance the epoch has won the race, we leave the job to that one. + return global_epoch; + } + + // The unlinked entry can later be freed. + unsafe { + guard.defer(move || curr.into_owned()); + } + + // Move forward, but don't change the predecessor. + curr = succ; + } else { + let local_epoch = c.epoch.load(Relaxed); -/// An entry in the linked list of the registered participants. -#[derive(Default, Debug)] -pub struct LocalEpoch { - /// The least significant bit is set if the participant is currently pinned. The rest of the - /// bits encode the current epoch. - state: AtomicUsize, + // If the participant was pinned in a different epoch, we cannot advance the global + // epoch just yet. + if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { + return global_epoch; + } + + // Move one step forward. + pred = &c.next; + curr = succ; + } + } + + atomic::fence(Acquire); + + // All pinned participants were pinned in the current global epoch. + // Now let's advance the global epoch... + // + // Note that if another thread already advanced it before us, this store will simply + // overwrite the global epoch with the same value. This is true because `try_advance` was + // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be + // advanced two steps ahead of it. + let new_epoch = global_epoch.successor(); + self.epoch.store(new_epoch, Release); + new_epoch + } } -impl Local { - /// Number of pinnings after which a participant will collect some global garbage. - const PINS_BETWEEN_COLLECT: usize = 128; +impl Drop for Global { + fn drop(&mut self) { + unsafe { + let guard = &unprotected(); + let mut curr = self.head.load(Relaxed, guard); - /// Creates a participant to the garbage collection global data. - #[inline] - pub fn new(global: &Global) -> Self { - let local_epoch = unsafe { - // Since we dereference no pointers in this block, it is safe to use `unprotected`. - unprotected(|scope| { - global.registries.insert(LocalEpoch::new(), scope).as_raw() - }) - }; + while let Some(c) = curr.as_ref() { + let succ = c.next.load(Relaxed, guard); + debug_assert_eq!(succ.tag(), 1); - Self { - bag: UnsafeCell::new(Bag::new()), - local_epoch, - is_pinned: Cell::new(false), - pin_count: Cell::new(0), + let o = curr.into_owned(); + debug_assert!((*o.bag.get()).is_empty()); + drop(o); + + curr = succ; + } } } +} - /// Pins the current participant, executes a function, and unpins the participant. - /// - /// The provided function takes a `Scope`, which can be used to interact with [`Atomic`]s. The - /// scope serves as a proof that whatever data you load from an [`Atomic`] will not be - /// concurrently deleted by another participant while the scope is alive. - /// - /// Note that keeping a participant pinned for a long time prevents memory reclamation of any - /// newly deleted objects protected by [`Atomic`]s. The provided function should be very quick - - /// generally speaking, it shouldn't take more than 100 ms. - /// - /// Pinning is reentrant. There is no harm in pinning a participant while it's already pinned - /// (repinning is essentially a noop). - /// - /// Pinning itself comes with a price: it begins with a `SeqCst` fence and performs a few other - /// atomic operations. However, this mechanism is designed to be as performant as possible, so - /// it can be used pretty liberally. On a modern machine pinning takes 10 to 15 nanoseconds. +/// Participant for garbage collection. +pub struct Local { + /// A reference to the global data. /// - /// # Safety + /// When all guards and handles get dropped, this reference is destroyed. + global: UnsafeCell>>, + + /// Pointer to the next entry in the linked list of registered participants. /// - /// You should pass `global` that is used to create this `Local`. Otherwise, the behavior is - /// undefined. + /// If an entry's `next` pointer is tagged with 1, it is considered to be deleted. + next: Atomic, + + /// The local bag of deferred functions. + pub(crate) bag: UnsafeCell, + + /// The number of guards keeping this participant pinned. + guard_count: Cell, + + /// The number of active handles. + handle_count: Cell, + + /// Total number of pinnings performed. /// - /// [`Atomic`]: struct.Atomic.html - pub unsafe fn pin(&self, global: &Global, f: F) -> R - where - F: FnOnce(&Scope) -> R, - { - let local_epoch = (*self.local_epoch).get(); - let scope = Scope { - global, - bag: self.bag.get(), - }; + /// This is just an auxilliary counter that sometimes kicks off collection. + pin_count: Cell>, - let was_pinned = self.is_pinned.get(); - if !was_pinned { - // Increment the pin counter. - let count = self.pin_count.get(); - self.pin_count.set(count.wrapping_add(1)); + /// The local epoch. + epoch: AtomicEpoch, +} - // Pin the participant. - self.is_pinned.set(true); - let epoch = global.get_epoch(); - local_epoch.set_pinned(epoch); +unsafe impl Sync for Local {} - // If the counter progressed enough, try advancing the epoch and collecting garbage. - if count % Self::PINS_BETWEEN_COLLECT == 0 { - global.collect(&scope); - } - } +impl Local { + /// Number of pinnings after which a participant will execute some deferred functions from the + /// global queue. + const PINNINGS_BETWEEN_COLLECT: usize = 128; - // This will unpin the participant even if `f` panics. - defer! { - if !was_pinned { - // Unpin the participant. - local_epoch.set_unpinned(); - self.is_pinned.set(false); + /// Registers a new `Local` in the provided `Global`. + pub fn register(global: Arc) -> Handle { + unsafe { + // Since we dereference no pointers in this block, it is safe to use `unprotected`. + let guard = &unprotected(); + + let mut new = Owned::new(Local { + global: UnsafeCell::new(ManuallyDrop::new(global.clone())), + next: Atomic::null(), + bag: UnsafeCell::new(Bag::new()), + guard_count: Cell::new(0), + handle_count: Cell::new(1), + pin_count: Cell::new(Wrapping(0)), + epoch: AtomicEpoch::new(Epoch::starting()), + }); + let mut head = global.head.load(Acquire, guard); + + loop { + new.next.store(head, Relaxed); + + // Try installing this thread's entry as the new head. + match global + .head + .compare_and_set_weak_owned(head, new, AcqRel, guard) + { + Ok(n) => return Handle { local: n.as_raw() }, + Err((h, n)) => { + head = h; + new = n; + } + } } } + } - f(&scope) + /// Returns a reference to the `Global` in which this `Local` resides. + #[inline] + pub fn global(&self) -> &Global { + unsafe { &*self.global.get() } } /// Returns `true` if the current participant is pinned. #[inline] pub fn is_pinned(&self) -> bool { - self.is_pinned.get() + self.guard_count.get() > 0 } - /// Unregisters itself from the garbage collector. - /// - /// # Safety - /// - /// You should pass `global` that is used to create this `Local`. Also, a `Local` should be - /// unregistered once, and after it is unregistered it should not be `pin()`ned. Otherwise, the - /// behavior is undefined. - pub unsafe fn unregister(&self, global: &Global) { - // Now that the participant is exiting, we must move the local bag into the global garbage - // queue. Also, let's try advancing the epoch and help free some garbage. - - self.pin(global, |scope| { - // Spare some cycles on garbage collection. - global.collect(scope); - - // Unregister the participant by marking this entry as deleted. - (*self.local_epoch).delete(scope); - - // Push the local bag into the global garbage queue. - global.push_bag(&mut *self.bag.get(), scope); - }); + pub fn defer(&self, mut garbage: Garbage, guard: &Guard) { + let bag = unsafe { &mut *self.bag.get() }; + + while let Err(g) = bag.try_push(garbage) { + self.global().push_bag(bag, guard); + garbage = g; + } } -} -impl LocalEpoch { - /// Creates a new local epoch. - #[inline] - pub fn new() -> Self { - Self::default() + pub fn flush(&self, guard: &Guard) { + let bag = unsafe { &mut *self.bag.get() }; + + if !bag.is_empty() { + self.global().push_bag(bag, guard); + } + + self.global().collect(guard); } - /// Returns if the participant is pinned, and if so, the epoch at which it is pinned. + /// Pins the `Local`. #[inline] - pub fn get_state(&self) -> (bool, usize) { - let state = self.state.load(Ordering::Relaxed); - ((state & 1) == 1, state & !1) + pub fn pin(&self) -> Guard { + let guard = Guard { local: self }; + + let guard_count = self.guard_count.get(); + self.guard_count.set(guard_count.checked_add(1).unwrap()); + + if guard_count == 0 { + let global_epoch = self.global().epoch.load(Relaxed); + let new_epoch = global_epoch.pinned(); + + // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence. + // The fence makes sure that any future loads from `Atomic`s will not happen before + // this store. + if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { + // HACK(stjepang): On x86 architectures there are two different ways of executing + // a `SeqCst` fence. + // + // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. + // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg` + // instruction. + // + // Both instructions have the effect of a full barrier, but benchmarks have shown + // that the second one makes pinning faster in this particular case. + let current = Epoch::starting(); + let previous = self.epoch.compare_and_swap(current, new_epoch, SeqCst); + debug_assert_eq!( + current, + previous, + "participant was expected to be unpinned" + ); + } else { + self.epoch.store(new_epoch, Relaxed); + atomic::fence(SeqCst); + } + + // Increment the pin counter. + let count = self.pin_count.get(); + self.pin_count.set(count + Wrapping(1)); + + // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting + // some garbage. + if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 { + self.global().collect(&guard); + } + } + + guard } - /// Marks the participant as pinned. - /// - /// Must not be called if the participant is already pinned! + /// Unpins the `Local`. #[inline] - pub fn set_pinned(&self, epoch: usize) { - let state = epoch | 1; - - // Now we must store `state` into `self.state`. It's important that any succeeding loads - // don't get reordered with this store. In order words, this participant's epoch must be - // fully announced to other participants. Only then it becomes safe to load from the shared - // memory. - if cfg!(any(target_arch = "x86", target_arch = "x86_64")) { - // On x86 architectures we have a choice: - // 1. `atomic::fence(SeqCst)`, which compiles to a `mfence` instruction. - // 2. `compare_and_swap(_, _, SeqCst)`, which compiles to a `lock cmpxchg` instruction. - // - // Both instructions have the effect of a full barrier, but the second one seems to be - // faster in this particular case. - let result = self.state.compare_and_swap(0, state, Ordering::SeqCst); - debug_assert_eq!(0, result, "LocalEpoch::set_pinned()'s CAS should succeed."); - } else { - self.state.store(state, Ordering::Relaxed); - ::std::sync::atomic::fence(Ordering::SeqCst); + pub fn unpin(&self) { + let guard_count = self.guard_count.get(); + self.guard_count.set(guard_count - 1); + + if guard_count == 1 { + self.epoch.store(Epoch::starting(), Release); + + if self.handle_count.get() == 0 { + Self::finalize(self); + } } } - /// Marks the participant as unpinned. + /// Increments the handle count. #[inline] - pub fn set_unpinned(&self) { - // Clear the last bit. - // We don't need to preserve the epoch, so just store the number zero. - self.state.store(0, Ordering::Release); + pub fn acquire_handle(&self) { + let handle_count = self.handle_count.get(); + debug_assert!(handle_count >= 1); + self.handle_count.set(handle_count + 1); + } + + /// Decrements the handle count. + #[inline] + pub fn release_handle(&self) { + let guard_count = self.guard_count.get(); + let handle_count = self.handle_count.get(); + debug_assert!(handle_count >= 1); + self.handle_count.set(handle_count - 1); + + if guard_count == 0 && handle_count == 1 { + Self::finalize(self); + } + } + + /// Removes the `Local` from the global linked list. + #[cold] + pub fn finalize(&self) { + assert_eq!(self.guard_count.get(), 0); + assert_eq!(self.handle_count.get(), 0); + + unsafe { + // Take the reference to the `Global` out of this `Local`. + let global: Arc = ptr::read(&**self.global.get()); + + // Move the local bag into the global queue. + self.global().push_bag(&mut *self.bag.get(), &unprotected()); + // Mark this node in the linked list as deleted. + self.next.fetch_or(1, SeqCst, &unprotected()); + + // Finally, drop the reference to the global. + // Note that this might be the last reference to the `Global`. If so, the global data + // will be destroyed and all deferred functions in its queue will be executed. + drop(global); + } } } diff --git a/src/lib.rs b/src/lib.rs index d065a1d..51f58f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,24 +55,22 @@ #![cfg_attr(feature = "nightly", feature(const_fn))] -#[macro_use(defer)] -extern crate scopeguard; -#[macro_use] -extern crate lazy_static; extern crate arrayvec; extern crate crossbeam_utils; +#[macro_use] +extern crate lazy_static; mod atomic; +mod collector; +mod default; mod deferred; mod epoch; mod garbage; +mod guard; mod internal; -mod collector; -mod scope; -mod default; mod sync; pub use self::atomic::{Atomic, CompareAndSetOrdering, Owned, Ptr}; -pub use self::scope::{Scope, unprotected}; -pub use self::default::{pin, is_pinned}; +pub use self::guard::{unprotected, Guard}; +pub use self::default::{default_handle, is_pinned, pin}; pub use self::collector::{Collector, Handle}; diff --git a/src/scope.rs b/src/scope.rs deleted file mode 100644 index 6e76b45..0000000 --- a/src/scope.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::mem; -use std::ptr; - -use garbage::{Garbage, Bag}; -use internal::Global; - -/// A witness that the current participant is pinned. -/// -/// A `&Scope` is a witness that the current participant is pinned. Lots of methods that interact -/// with [`Atomic`]s can safely be called only while the participant is pinned so they often require -/// a `&Scope`. -/// -/// This data type is inherently bound to the thread that created it, therefore it does not -/// implement `Send` nor `Sync`. -/// -/// [`Atomic`]: struct.Atomic.html -#[derive(Debug)] -pub struct Scope { - /// A reference to the global data. - pub(crate) global: *const Global, - /// A reference to the thread-local bag. - pub(crate) bag: *mut Bag, // !Send + !Sync -} - -impl Scope { - unsafe fn defer_garbage(&self, mut garbage: Garbage) { - self.global.as_ref().map(|global| { - let bag = &mut *self.bag; - while let Err(g) = bag.try_push(garbage) { - global.push_bag(bag, self); - garbage = g; - } - }); - } - - /// Deferred execution of an arbitrary function `f`. - /// - /// This function inserts the function into a thread-local [`Bag`]. When the bag becomes full, - /// the bag is flushed into the globally shared queue of bags. - /// - /// If this function is destroying a particularly large object, it is wise to follow up with a - /// call to [`flush`] so that it doesn't get stuck waiting in the thread-local bag for a long - /// time. - /// - /// [`Bag`]: struct.Bag.html - /// [`flush`]: fn.flush.html - pub unsafe fn defer R + Send>(&self, f: F) { - self.defer_garbage(Garbage::new(|| drop(f()))) - } - - /// Flushes all garbage in the thread-local storage into the global garbage queue, attempts to - /// advance the epoch, and collects some garbage. - /// - /// Even though flushing can be explicitly called, it is also automatically triggered when the - /// thread-local storage fills up or when we pin the current participant a specific number of - /// times. - /// - /// It is wise to flush the bag just after `defer`ring the deallocation of a very large object, - /// so that it isn't sitting in the thread-local bag for a long time. - /// - /// [`defer`]: fn.defer.html - pub fn flush(&self) { - unsafe { - self.global.as_ref().map(|global| { - let bag = &mut *self.bag; - - if !bag.is_empty() { - global.push_bag(bag, &self); - } - - global.collect(&self); - }); - } - } -} - -/// Returns a [`Scope`] without pinning any participant. -/// -/// Sometimes, we'd like to have longer-lived scopes in which we know our thread can access atomics -/// without protection. This is true e.g. when deallocating a big data structure, or when -/// constructing it from a long iterator. In such cases we don't need to be overprotective because -/// there is no fear of other threads concurrently destroying objects. -/// -/// Function `unprotected` is *unsafe* because we must promise that (1) the locations that we access -/// should not be deallocated by concurrent participants, and (2) the locations that we deallocate -/// should not be accessed by concurrent participants. -/// -/// Just like with the safe epoch::pin function, unprotected use of atomics is enclosed within a -/// scope so that pointers created within it don't leak out or get mixed with pointers from other -/// scopes. -#[inline] -pub unsafe fn unprotected(f: F) -> R -where - F: FnOnce(&Scope) -> R, -{ - let scope = Scope { - global: ptr::null(), - bag: mem::uninitialized(), - }; - f(&scope) -} diff --git a/src/sync/list.rs b/src/sync/list.rs deleted file mode 100644 index 568cf96..0000000 --- a/src/sync/list.rs +++ /dev/null @@ -1,236 +0,0 @@ -//! Michael's lock-free linked list. -//! -//! Michael. High Performance Dynamic Lock-Free Hash Tables and List-Based Sets. SPAA 2002. -//! http://dl.acm.org/citation.cfm?id=564870.564881 - -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; - -use {Atomic, Owned, Ptr, Scope, unprotected}; -use crossbeam_utils::cache_padded::CachePadded; - - -/// An entry in the linked list. -struct NodeInner { - /// The data in the entry. - data: T, - - /// The next entry in the linked list. - /// If the tag is 1, this entry is marked as deleted. - next: Atomic>, -} - -unsafe impl Send for NodeInner {} - -#[derive(Debug)] -pub struct Node(CachePadded>); - -#[derive(Debug)] -pub struct List { - head: Atomic>, -} - -pub struct Iter<'scope, T: 'scope> { - /// The scope in which the iterator is operating. - scope: &'scope Scope, - - /// Pointer from the predecessor to the current entry. - pred: &'scope Atomic>, - - /// The current entry. - curr: Ptr<'scope, Node>, -} - -#[derive(PartialEq, Debug)] -pub enum IterError { - /// Iterator lost a race in deleting a node by a concurrent iterator. - LostRace, -} - -impl Node { - /// Returns the data in this entry. - fn new(data: T) -> Self { - Node(CachePadded::new(NodeInner { - data, - next: Atomic::null(), - })) - } - - pub fn get(&self) -> &T { - &self.0.data - } -} - -impl Node { - /// Marks this entry as deleted. - pub fn delete(&self, scope: &Scope) { - self.0.next.fetch_or(1, Release, scope); - } -} - -impl List { - /// Returns a new, empty linked list. - pub fn new() -> Self { - List { head: Atomic::null() } - } - - /// Inserts `data` into the list. - #[inline] - fn insert_internal<'scope>( - to: &'scope Atomic>, - data: T, - scope: &'scope Scope, - ) -> Ptr<'scope, Node> { - let mut cur = Owned::new(Node::new(data)); - let mut next = to.load(Relaxed, scope); - - loop { - cur.0.next.store(next, Relaxed); - match to.compare_and_set_weak_owned(next, cur, Release, scope) { - Ok(cur) => return cur, - Err((n, c)) => { - next = n; - cur = c; - } - } - } - } - - /// Inserts `data` into the head of the list. - #[inline] - pub fn insert<'scope>(&'scope self, data: T, scope: &'scope Scope) -> Ptr<'scope, Node> { - Self::insert_internal(&self.head, data, scope) - } - - /// Inserts `data` after `after` into the list. - #[inline] - #[allow(dead_code)] - pub fn insert_after<'scope>( - &'scope self, - after: &'scope Atomic>, - data: T, - scope: &'scope Scope, - ) -> Ptr<'scope, Node> { - Self::insert_internal(after, data, scope) - } - - /// Returns an iterator over all data. - /// - /// # Caveat - /// - /// Every datum that is inserted at the moment this function is called and persists at least - /// until the end of iteration will be returned. Since this iterator traverses a lock-free - /// linked list that may be concurrently modified, some additional caveats apply: - /// - /// 1. If a new datum is inserted during iteration, it may or may not be returned. - /// 2. If a datum is deleted during iteration, it may or may not be returned. - /// 3. It may not return all data if a concurrent thread continues to iterate the same list. - pub fn iter<'scope>(&'scope self, scope: &'scope Scope) -> Iter<'scope, T> { - let pred = &self.head; - let curr = pred.load(Acquire, scope); - Iter { scope, pred, curr } - } -} - -impl Drop for List { - fn drop(&mut self) { - unsafe { - unprotected(|scope| { - let mut curr = self.head.load(Relaxed, scope); - while let Some(c) = curr.as_ref() { - let succ = c.0.next.load(Relaxed, scope); - drop(curr.into_owned()); - curr = succ; - } - }); - } - } -} - -impl<'scope, T> Iterator for Iter<'scope, T> { - type Item = Result<&'scope Node, IterError>; - - fn next(&mut self) -> Option { - while let Some(c) = unsafe { self.curr.as_ref() } { - let succ = c.0.next.load(Acquire, self.scope); - - if succ.tag() == 1 { - // This entry was removed. Try unlinking it from the list. - let succ = succ.with_tag(0); - - match self.pred.compare_and_set_weak( - self.curr, - succ, - Acquire, - self.scope, - ) { - Ok(_) => { - unsafe { - // Deferred drop of `T` is scheduled here. - // This is okay because `.delete()` can be called only if `T: 'static`. - let p = self.curr; - self.scope.defer(move || p.into_owned()); - } - self.curr = succ; - } - Err(succ) => { - // We lost the race to delete the entry by a concurrent iterator. Set - // `self.curr` to the updated pointer, and report the lost. - self.curr = succ; - return Some(Err(IterError::LostRace)); - } - } - - continue; - } - - // Move one step forward. - self.pred = &c.0.next; - self.curr = succ; - - return Some(Ok(&c)); - } - - // We reached the end of the list. - None - } -} - -impl Default for List { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use Collector; - use super::*; - - #[test] - fn insert_iter_delete_iter() { - let l: List = List::new(); - - let collector = Collector::new(); - let handle = collector.handle(); - - handle.pin(|scope| { - let p2 = l.insert(2, scope); - let n2 = unsafe { p2.as_ref().unwrap() }; - let _p3 = l.insert_after(&n2.0.next, 3, scope); - let _p1 = l.insert(1, scope); - - let mut iter = l.iter(scope); - assert!(iter.next().is_some()); - assert!(iter.next().is_some()); - assert!(iter.next().is_some()); - assert!(iter.next().is_none()); - - n2.delete(scope); - - let mut iter = l.iter(scope); - assert!(iter.next().is_some()); - assert!(iter.next().is_some()); - assert!(iter.next().is_none()); - }); - } -} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index f8eb259..e96327b 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,4 +1,3 @@ //! Synchronization primitives. -pub mod list; pub mod queue; diff --git a/src/sync/queue.rs b/src/sync/queue.rs index e720078..25ae8d3 100644 --- a/src/sync/queue.rs +++ b/src/sync/queue.rs @@ -7,11 +7,12 @@ use std::mem::{self, ManuallyDrop}; use std::ptr; -use std::sync::atomic::Ordering::{Relaxed, Acquire, Release}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; -use {Atomic, Owned, Ptr, Scope, unprotected}; use crossbeam_utils::cache_padded::CachePadded; +use {unprotected, Atomic, Guard, Owned, Ptr}; + // The representation here is a singly-linked list, with a sentinel node at the front. In general // the `tail` pointer may lag behind the actual tail. Non-sentinel nodes are either all `Data` or // all `Blocked` (requests for data from blocked threads). @@ -53,53 +54,52 @@ impl Queue { next: Atomic::null(), }); unsafe { - unprotected(|scope| { - let sentinel = sentinel.into_ptr(scope); - q.head.store(sentinel, Relaxed); - q.tail.store(sentinel, Relaxed); - q - }) + let guard = &unprotected(); + let sentinel = sentinel.into_ptr(guard); + q.head.store(sentinel, Relaxed); + q.tail.store(sentinel, Relaxed); + q } } /// Attempts to atomically place `n` into the `next` pointer of `onto`, and returns `true` on /// success. The queue's `tail` pointer may be updated. #[inline(always)] - fn push_internal(&self, onto: Ptr>, new: Ptr>, scope: &Scope) -> bool { + fn push_internal(&self, onto: Ptr>, new: Ptr>, guard: &Guard) -> bool { // is `onto` the actual tail? let o = unsafe { onto.deref() }; - let next = o.next.load(Acquire, scope); + let next = o.next.load(Acquire, guard); if unsafe { next.as_ref().is_some() } { // if not, try to "help" by moving the tail pointer forward - let _ = self.tail.compare_and_set(onto, next, Release, scope); + let _ = self.tail.compare_and_set(onto, next, Release, guard); false } else { // looks like the actual tail; attempt to link in `n` let result = o.next - .compare_and_set(Ptr::null(), new, Release, scope) + .compare_and_set(Ptr::null(), new, Release, guard) .is_ok(); if result { // try to move the tail pointer forward - let _ = self.tail.compare_and_set(onto, new, Release, scope); + let _ = self.tail.compare_and_set(onto, new, Release, guard); } result } } /// Adds `t` to the back of the queue, possibly waking up threads blocked on `pop`. - pub fn push(&self, t: T, scope: &Scope) { + pub fn push(&self, t: T, guard: &Guard) { let new = Owned::new(Node { data: ManuallyDrop::new(t), next: Atomic::null(), }); - let new = Owned::into_ptr(new, scope); + let new = Owned::into_ptr(new, guard); loop { // We push onto the tail, so we'll start optimistically by looking there first. - let tail = self.tail.load(Acquire, scope); + let tail = self.tail.load(Acquire, guard); // Attempt to push onto the `tail` snapshot; fails if `tail.next` has changed. - if self.push_internal(tail, new, scope) { + if self.push_internal(tail, new, guard) { break; } } @@ -107,16 +107,16 @@ impl Queue { /// Attempts to pop a data node. `Ok(None)` if queue is empty; `Err(())` if lost race to pop. #[inline(always)] - fn pop_internal(&self, scope: &Scope) -> Result, ()> { - let head = self.head.load(Acquire, scope); + fn pop_internal(&self, guard: &Guard) -> Result, ()> { + let head = self.head.load(Acquire, guard); let h = unsafe { head.deref() }; - let next = h.next.load(Acquire, scope); + let next = h.next.load(Acquire, guard); match unsafe { next.as_ref() } { Some(n) => unsafe { self.head - .compare_and_set(head, next, Release, scope) + .compare_and_set(head, next, Release, guard) .map(|_| { - scope.defer(move || drop(head.into_owned())); + guard.defer(move || drop(head.into_owned())); Some(ManuallyDrop::into_inner(ptr::read(&n.data))) }) .map_err(|_| ()) @@ -128,20 +128,20 @@ impl Queue { /// Attempts to pop a data node, if the data satisfies the given condition. `Ok(None)` if queue /// is empty or the data does not satisfy the condition; `Err(())` if lost race to pop. #[inline(always)] - fn pop_if_internal(&self, condition: F, scope: &Scope) -> Result, ()> + fn pop_if_internal(&self, condition: F, guard: &Guard) -> Result, ()> where T: Sync, F: Fn(&T) -> bool, { - let head = self.head.load(Acquire, scope); + let head = self.head.load(Acquire, guard); let h = unsafe { head.deref() }; - let next = h.next.load(Acquire, scope); + let next = h.next.load(Acquire, guard); match unsafe { next.as_ref() } { Some(n) if condition(&n.data) => unsafe { self.head - .compare_and_set(head, next, Release, scope) + .compare_and_set(head, next, Release, guard) .map(|_| { - scope.defer(move || drop(head.into_owned())); + guard.defer(move || drop(head.into_owned())); Some(ManuallyDrop::into_inner(ptr::read(&n.data))) }) .map_err(|_| ()) @@ -153,9 +153,9 @@ impl Queue { /// Attempts to dequeue from the front. /// /// Returns `None` if the queue is observed to be empty. - pub fn try_pop(&self, scope: &Scope) -> Option { + pub fn try_pop(&self, guard: &Guard) -> Option { loop { - if let Ok(head) = self.pop_internal(scope) { + if let Ok(head) = self.pop_internal(guard) { return head; } } @@ -165,13 +165,13 @@ impl Queue { /// /// Returns `None` if the queue is observed to be empty, or the head does not satisfy the given /// condition. - pub fn try_pop_if(&self, condition: F, scope: &Scope) -> Option + pub fn try_pop_if(&self, condition: F, guard: &Guard) -> Option where T: Sync, F: Fn(&T) -> bool, { loop { - if let Ok(head) = self.pop_if_internal(&condition, scope) { + if let Ok(head) = self.pop_if_internal(&condition, guard) { return head; } } @@ -181,13 +181,13 @@ impl Queue { impl Drop for Queue { fn drop(&mut self) { unsafe { - unprotected(|scope| { - while let Some(_) = self.try_pop(scope) {} + let guard = &unprotected(); - // Destroy the remaining sentinel node. - let sentinel = self.head.load(Relaxed, scope); - drop(sentinel.into_owned()); - }) + while let Some(_) = self.try_pop(guard) {} + + // Destroy the remaining sentinel node. + let sentinel = self.head.load(Relaxed, guard); + drop(sentinel.into_owned()); } } } @@ -205,23 +205,26 @@ mod test { impl Queue { pub fn new() -> Queue { - Queue { queue: super::Queue::new() } + Queue { + queue: super::Queue::new(), + } } pub fn push(&self, t: T) { - pin(|scope| self.queue.push(t, scope)) + let guard = &pin(); + self.queue.push(t, guard); } pub fn is_empty(&self) -> bool { - pin(|scope| { - let head = self.queue.head.load(Acquire, scope); - let h = unsafe { head.deref() }; - h.next.load(Acquire, scope).is_null() - }) + let guard = &pin(); + let head = self.queue.head.load(Acquire, guard); + let h = unsafe { head.deref() }; + h.next.load(Acquire, guard).is_null() } pub fn try_pop(&self) -> Option { - pin(|scope| self.queue.try_pop(scope)) + let guard = &pin(); + self.queue.try_pop(guard) } pub fn pop(&self) -> T { From 0b84d0c59317b767f36d4418b5655984c7bf3993 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 5 Nov 2017 15:01:16 +0100 Subject: [PATCH 2/5] Change the signature of unprotected() --- src/guard.rs | 76 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 21 deletions(-) diff --git a/src/guard.rs b/src/guard.rs index a6df48b..33aa09d 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -46,8 +46,8 @@ use internal::Local; /// # Multiple guards /// /// Pinning is reentrant and it is perfectly legal to create multiple guards. In that case, the -/// thread will be pinned only when the first guard is created and unpinned when the last one is -/// dropped: +/// thread will actually be pinned only when the first guard is created and unpinned when the last +/// one is dropped: /// /// ``` /// use crossbeam_epoch as epoch; @@ -61,6 +61,15 @@ use internal::Local; /// assert!(!epoch::is_pinned()); /// ``` /// +/// The same can be achieved by cloning guards: +/// +/// ``` +/// use crossbeam_epoch as epoch; +/// +/// let guard1 = epoch::pin(); +/// let guard2 = guard1.clone(); +/// ``` +/// /// [`pin`]: fn.pin.html pub struct Guard { pub(crate) local: *const Local, @@ -194,34 +203,62 @@ impl Drop for Guard { } } -/// Creates a dummy guard that doesn't really pin the current thread. +impl Clone for Guard { + #[inline] + fn clone(&self) -> Guard { + match unsafe { self.local.as_ref() } { + None => Guard { local: ptr::null() }, + Some(local) => local.pin(), + } + } +} + +/// Returns a reference to a dummy guard that allows unprotected access to [`Atomic`]s. /// -/// This is a function for special uses only. It creates a guard that can be used for loading -/// [`Atomic`]s, but will not pin or unpin the current thread. Calling [`defer`] with a dummy guard -/// will simply execute the function immediately. +/// This guard should be used in special occasions only. Note that it doesn't actually keep any +/// thread pinned - it's just a fake guard that allows loading from [`Atomic`]s unsafely. +/// +/// Note that calling [`defer`] with a dummy guard will not defer the function - it will just +/// execute the function immediately. +/// +/// If necessary, it's possible to create more dummy guards by cloning: `unprotected().clone()`. /// /// # Safety /// -/// Loading and dereferencing data from an [`Atomic`] using a dummy guard is safe only if the +/// Loading and dereferencing data from an [`Atomic`] using this guard is safe only if the /// [`Atomic`] is not being concurrently modified by other threads. /// /// # Examples /// /// ``` -/// use crossbeam_epoch as epoch; +/// use crossbeam_epoch::{self as epoch, Atomic}; +/// use std::sync::atomic::Ordering::Relaxed; +/// +/// let a = Atomic::new(7); /// /// unsafe { -/// let guard = &epoch::unprotected(); -/// guard.defer(move || { +/// // Load `a` without pinning the current thread. +/// a.load(Relaxed, epoch::unprotected()); +/// +/// // It's possible to create more dummy guards by calling `clone()`. +/// let dummy = &epoch::unprotected().clone(); +/// +/// dummy.defer(move || { /// println!("This gets executed immediately."); /// }); +/// +/// // Dropping `dummy` doesn't affect the current thread - it's just a noop. /// } /// ``` /// /// The most common use of this function is when constructing or destructing a data structure. /// /// For example, we can use a dummy guard in the destructor of a Treiber stack because at that -/// point no other thread could concurrently modify the [`Atomic`]s we are accessing: +/// point no other thread could concurrently modify the [`Atomic`]s we are accessing. +/// +/// If we were to actually pin the current thread during destruction, that would just unnecessarily +/// delay garbage collection and incur some performance cost, so in cases like these `unprotected` +/// is very helpful. /// /// ``` /// use crossbeam_epoch::{self as epoch, Atomic}; @@ -240,13 +277,12 @@ impl Drop for Guard { /// impl Drop for Stack { /// fn drop(&mut self) { /// unsafe { -/// // Create a dummy guard. -/// let guard = &epoch::unprotected(); -/// -/// let mut node = self.head.load(Relaxed, guard); +/// // Unprotected load. +/// let mut node = self.head.load(Relaxed, epoch::unprotected()); /// /// while let Some(n) = node.as_ref() { -/// let next = n.next.load(Relaxed, guard); +/// // Unprotected load. +/// let next = n.next.load(Relaxed, epoch::unprotected()); /// /// // Take ownership of the node, then drop its data and deallocate it. /// let mut o = node.into_owned(); @@ -260,12 +296,10 @@ impl Drop for Guard { /// } /// ``` /// -/// Really pinning the current thread would only unnecessarily delay garbage collection and incur -/// some performance cost, so in cases like these `unprotected` is of great help. -/// /// [`Atomic`]: struct.Atomic.html /// [`defer`]: struct.Guard.html#method.defer #[inline] -pub unsafe fn unprotected() -> Guard { - Guard { local: ptr::null() } +pub unsafe fn unprotected() -> &'static Guard { + static UNPROTECTED: usize = 0; + &*(&UNPROTECTED as *const _ as *const Guard) } From e3d3bf44dc68806f5a56fd0e4be8314b6ce1f0ad Mon Sep 17 00:00:00 2001 From: Jeehoon Kang Date: Tue, 7 Nov 2017 10:52:39 +0900 Subject: [PATCH 3/5] Refactor intrusive linked list from src/internal.rs --- Cargo.toml | 1 + src/internal.rs | 157 +++++++++--------------- src/lib.rs | 2 + src/sync/list.rs | 314 +++++++++++++++++++++++++++++++++++++++++++++++ src/sync/mod.rs | 1 + 5 files changed, 378 insertions(+), 97 deletions(-) create mode 100644 src/sync/list.rs diff --git a/Cargo.toml b/Cargo.toml index d1d29d5..b8011d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ strict_gc = [] arrayvec = "0.4" crossbeam-utils = "0.1" lazy_static = "0.2" +memoffset = "0.1.0" [dev-dependencies] rand = "0.3" diff --git a/src/internal.rs b/src/internal.rs index 28a8ed5..67f3551 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -22,21 +22,22 @@ use std::num::Wrapping; use std::ptr; use std::sync::Arc; use std::sync::atomic; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst}; use crossbeam_utils::cache_padded::CachePadded; -use atomic::{Atomic, Owned}; +use atomic::Owned; use collector::Handle; use epoch::{AtomicEpoch, Epoch}; use guard::{unprotected, Guard}; use garbage::{Bag, Garbage}; +use sync::list::{List, Entry, IterError, Container}; use sync::queue::Queue; /// The global data for a garbage collector. pub struct Global { - /// The head pointer of the list of `Local`s. - head: Atomic, + /// The intrusive linked list of `Local`s. + locals: List, /// The global queue of bags of deferred functions. queue: Queue<(Epoch, Bag)>, @@ -53,7 +54,7 @@ impl Global { #[inline] pub fn new() -> Self { Self { - head: Atomic::null(), + locals: List::new(), queue: Queue::new(), epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())), } @@ -110,45 +111,25 @@ impl Global { // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly // easy to implement in a lock-free manner. However, traversal can be slow due to cache // misses and data dependencies. We should experiment with other data structures as well. - let mut pred = &self.head; - let mut curr = pred.load(Acquire, guard); - - while let Some(c) = unsafe { curr.as_ref() } { - let succ = c.next.load(Acquire, guard); - - if succ.tag() == 1 { - // This thread has exited. Try unlinking it from the list. - let succ = succ.with_tag(0); - - if pred.compare_and_set(curr, succ, AcqRel, guard).is_err() { + for local in self.locals.iter(&guard) { + match local { + Err(IterError::LostRace) => { // We lost the race to unlink the thread. Usually that means we should traverse - // the list again from the beginning, but since another thread trying to - // advance the epoch has won the race, we leave the job to that one. + // the list again from the beginning, but since another thread trying to advance + // the epoch has won the race, we leave the job to that one. return global_epoch; } + Ok(local) => { + let local_epoch = local.epoch.load(Relaxed); - // The unlinked entry can later be freed. - unsafe { - guard.defer(move || curr.into_owned()); - } - - // Move forward, but don't change the predecessor. - curr = succ; - } else { - let local_epoch = c.epoch.load(Relaxed); - - // If the participant was pinned in a different epoch, we cannot advance the global - // epoch just yet. - if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { - return global_epoch; + // If the participant was pinned in a different epoch, we cannot advance the + // global epoch just yet. + if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch { + return global_epoch; + } } - - // Move one step forward. - pred = &c.next; - curr = succ; } } - atomic::fence(Acquire); // All pinned participants were pinned in the current global epoch. @@ -164,38 +145,19 @@ impl Global { } } -impl Drop for Global { - fn drop(&mut self) { - unsafe { - let guard = &unprotected(); - let mut curr = self.head.load(Relaxed, guard); - - while let Some(c) = curr.as_ref() { - let succ = c.next.load(Relaxed, guard); - debug_assert_eq!(succ.tag(), 1); - - let o = curr.into_owned(); - debug_assert!((*o.bag.get()).is_empty()); - drop(o); - - curr = succ; - } - } - } -} - /// Participant for garbage collection. pub struct Local { + /// A node in the intrusive linked list of `Local`s. + entry: Entry, + + /// The local epoch. + epoch: AtomicEpoch, + /// A reference to the global data. /// /// When all guards and handles get dropped, this reference is destroyed. global: UnsafeCell>>, - /// Pointer to the next entry in the linked list of registered participants. - /// - /// If an entry's `next` pointer is tagged with 1, it is considered to be deleted. - next: Atomic, - /// The local bag of deferred functions. pub(crate) bag: UnsafeCell, @@ -209,9 +171,6 @@ pub struct Local { /// /// This is just an auxilliary counter that sometimes kicks off collection. pin_count: Cell>, - - /// The local epoch. - epoch: AtomicEpoch, } unsafe impl Sync for Local {} @@ -225,34 +184,18 @@ impl Local { pub fn register(global: Arc) -> Handle { unsafe { // Since we dereference no pointers in this block, it is safe to use `unprotected`. - let guard = &unprotected(); - let mut new = Owned::new(Local { + let local = Owned::new(Local { + entry: Entry::default(), + epoch: AtomicEpoch::new(Epoch::starting()), global: UnsafeCell::new(ManuallyDrop::new(global.clone())), - next: Atomic::null(), bag: UnsafeCell::new(Bag::new()), guard_count: Cell::new(0), handle_count: Cell::new(1), pin_count: Cell::new(Wrapping(0)), - epoch: AtomicEpoch::new(Epoch::starting()), - }); - let mut head = global.head.load(Acquire, guard); - - loop { - new.next.store(head, Relaxed); - - // Try installing this thread's entry as the new head. - match global - .head - .compare_and_set_weak_owned(head, new, AcqRel, guard) - { - Ok(n) => return Handle { local: n.as_raw() }, - Err((h, n)) => { - head = h; - new = n; - } - } - } + }).into_ptr(&unprotected()); + global.locals.insert(local, &unprotected()); + Handle { local: local.as_raw() } } } @@ -348,7 +291,7 @@ impl Local { self.epoch.store(Epoch::starting(), Release); if self.handle_count.get() == 0 { - Self::finalize(self); + self.finalize(); } } } @@ -370,29 +313,49 @@ impl Local { self.handle_count.set(handle_count - 1); if guard_count == 0 && handle_count == 1 { - Self::finalize(self); + self.finalize(); } } /// Removes the `Local` from the global linked list. #[cold] - pub fn finalize(&self) { - assert_eq!(self.guard_count.get(), 0); - assert_eq!(self.handle_count.get(), 0); + fn finalize(&self) { + debug_assert_eq!(self.guard_count.get(), 0); + debug_assert_eq!(self.handle_count.get(), 0); unsafe { // Take the reference to the `Global` out of this `Local`. let global: Arc = ptr::read(&**self.global.get()); // Move the local bag into the global queue. - self.global().push_bag(&mut *self.bag.get(), &unprotected()); + global.push_bag(&mut *self.bag.get(), &unprotected()); + // Mark this node in the linked list as deleted. - self.next.fetch_or(1, SeqCst, &unprotected()); + self.entry.delete(&unprotected()); - // Finally, drop the reference to the global. - // Note that this might be the last reference to the `Global`. If so, the global data - // will be destroyed and all deferred functions in its queue will be executed. + // Finally, drop the reference to the global. Note that this might be the last + // reference to the `Global`. If so, the global data will be destroyed and all deferred + // functions in its queue will be executed. drop(global); } } } + +struct LocalContainer {} + +impl Container for LocalContainer { + fn container_of(entry: *const Entry) -> *const Local { + (entry as usize - offset_of!(Local, entry)) as *const _ + } + + fn entry_of(local: *const Local) -> *const Entry { + (local as usize + offset_of!(Local, entry)) as *const _ + } + + fn finalize(entry: *const Entry) { + let local = Self::container_of(entry); + unsafe { + drop(Box::from_raw(local as *mut Local)); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 51f58f9..2866efc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,8 @@ extern crate arrayvec; extern crate crossbeam_utils; #[macro_use] extern crate lazy_static; +#[macro_use] +extern crate memoffset; mod atomic; mod collector; diff --git a/src/sync/list.rs b/src/sync/list.rs new file mode 100644 index 0000000..18e92d5 --- /dev/null +++ b/src/sync/list.rs @@ -0,0 +1,314 @@ +//! Lock-free intrusive linked list. +//! +//! Ideas from Michael. High Performance Dynamic Lock-Free Hash Tables and List-Based Sets. SPAA +//! 2002. http://dl.acm.org/citation.cfm?id=564870.564881 + +use std::marker::PhantomData; +use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; + +use {Atomic, Ptr, Guard, unprotected}; + +/// An entry in a linked list. +/// +/// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different +/// cache-line than thread-local data in terms of performance. +#[derive(Debug)] +pub struct Entry { + /// The next entry in the linked list. + /// If the tag is 1, this entry is marked as deleted. + next: Atomic, +} + +/// An evidence that the type `T` contains an entry. +/// +/// Suppose we'll maintain an intrusive linked list of type `T`. Since `List` and `Entry` types do +/// not provide any information on `T`, we need to specify how to interact with the containing +/// objects of type `T`. A struct of this trait provides such information. +/// +/// `container_of()` is given a pointer to an entry, and returns the pointer to its container. On +/// the other hand, `entry_of()` is given a pointer to a container, and returns the pointer to its +/// corresponding entry. `finalize()` is called when an element is actually removed from the list. +/// +/// # Example +/// +/// ```ignore +/// struct A { +/// entry: Entry, +/// data: usize, +/// } +/// +/// struct AEntry {} +/// +/// impl Container for AEntry { +/// fn container_of(entry: *const Entry) -> *const A { +/// ((entry as usize) - offset_of!(A, entry)) as *const _ +/// } +/// +/// fn entry_of(a: *const A) -> *const Entry { +/// ((a as usize) + offset_of!(A, entry)) as *const _ +/// } +/// +/// fn finalize(entry: *const Entry) { +/// // drop the box of the container +/// unsafe { drop(Box::from_raw(Self::container_of(entry) as *mut A)) } +/// } +/// } +/// ``` +/// +/// Note that there can be multiple structs that implement `Container`. In most cases, each +/// struct will represent a distinct entry in `T` so that the container can be inserted into +/// multiple lists. For example, we can insert the following struct into two lists using `entry1` +/// and `entry2` ans its entry: +/// +/// ```ignore +/// struct B { +/// entry1: Entry, +/// entry2: Entry, +/// data: usize, +/// } +/// ``` +pub trait Container { + fn container_of(*const Entry) -> *const T; + fn entry_of(*const T) -> *const Entry; + fn finalize(*const Entry); +} + +/// A lock-free, intrusive linked list of type `T`. +#[derive(Debug)] +pub struct List> { + /// The head of the linked list. + head: Atomic, + + /// The phantom data for using `T` and `E`. + _marker: PhantomData<(T, C)>, +} + +/// An auxiliary data for iterating over a linked list. +pub struct Iter<'g, T, C: Container> { + /// The guard that protects the iteration. + guard: &'g Guard, + + /// Pointer from the predecessor to the current entry. + pred: &'g Atomic, + + /// The current entry. + curr: Ptr<'g, Entry>, + + /// The phantom data for container. + _marker: PhantomData<(T, C)>, +} + +/// An enum for iteration error. +#[derive(PartialEq, Debug)] +pub enum IterError { + /// Iterator lost a race in deleting an entry by a concurrent iterator. + LostRace, +} + +impl Default for Entry { + /// Returns the empty entry. + fn default() -> Self { + Self { next: Atomic::null() } + } +} + +impl Entry { + /// Marks this entry as deleted, deferring the actual deallocation to a later iteration. + /// + /// # Safety + /// + /// The entry should be a member of a linked list, and it should not have been deleted. It + /// should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C` is + /// the associated helper for the linked list. + pub unsafe fn delete(&self, guard: &Guard) { + self.next.fetch_or(1, Release, guard); + } +} + +impl> List { + /// Returns a new, empty linked list. + pub fn new() -> Self { + Self { + head: Atomic::null(), + _marker: PhantomData, + } + } + + /// Inserts `entry` into the head of the list. + /// + /// # Safety + /// + /// You should guarantee that: + /// + /// - `C::entry_of()` properly accesses an `Entry` in the container; + /// - `C::container_of()` properly accesses the object containing the entry; + /// - `container` is immovable, e.g. inside a `Box`; + /// - An entry is not inserted twice; and + /// - The inserted object will be removed before the list is dropped. + pub unsafe fn insert<'g>(&'g self, container: Ptr<'g, T>, guard: &'g Guard) { + let to = &self.head; + let entry = &*C::entry_of(container.as_raw()); + let entry_ptr = Ptr::from_raw(entry); + let mut next = to.load(Relaxed, guard); + + loop { + entry.next.store(next, Relaxed); + match to.compare_and_set_weak(next, entry_ptr, Release, guard) { + Ok(_) => break, + Err(n) => next = n, + } + } + } + + /// Returns an iterator over all objects. + /// + /// # Caveat + /// + /// Every object that is inserted at the moment this function is called and persists at least + /// until the end of iteration will be returned. Since this iterator traverses a lock-free + /// linked list that may be concurrently modified, some additional caveats apply: + /// + /// 1. If a new object is inserted during iteration, it may or may not be returned. + /// 2. If an object is deleted during iteration, it may or may not be returned. + /// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning + /// thread will continue to iterate over the same list. + pub fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> { + let pred = &self.head; + let curr = pred.load(Acquire, guard); + Iter { + guard, + pred, + curr, + _marker: PhantomData, + } + } +} + +impl> Drop for List { + fn drop(&mut self) { + unsafe { + let guard = &unprotected(); + let mut curr = self.head.load(Relaxed, guard); + while let Some(c) = curr.as_ref() { + let succ = c.next.load(Relaxed, guard); + assert_eq!(succ.tag(), 1); + + C::finalize(curr.as_raw()); + curr = succ; + } + } + } +} + +impl<'g, T: 'g, C: Container> Iterator for Iter<'g, T, C> { + type Item = Result<&'g T, IterError>; + + fn next(&mut self) -> Option { + while let Some(c) = unsafe { self.curr.as_ref() } { + let succ = c.next.load(Acquire, self.guard); + + if succ.tag() == 1 { + // This entry was removed. Try unlinking it from the list. + let succ = succ.with_tag(0); + + match self.pred.compare_and_set_weak( + self.curr, + succ, + Acquire, + self.guard, + ) { + Ok(_) => { + unsafe { + // Deferred drop of `T` is scheduled here. + // This is okay because `.delete()` can be called only if `T: 'static`. + let p = self.curr; + self.guard.defer(move || C::finalize(p.as_raw())); + } + self.curr = succ; + } + Err(succ) => { + // We lost the race to delete the entry by a concurrent iterator. Set + // `self.curr` to the updated pointer, and report the lost. + self.curr = succ; + return Some(Err(IterError::LostRace)); + } + } + + continue; + } + + // Move one step forward. + self.pred = &c.next; + self.curr = succ; + + return Some(Ok(unsafe { &*C::container_of(c as *const _) })); + } + + // We reached the end of the list. + None + } +} + +#[cfg(test)] +mod tests { + use {Collector, Owned}; + use super::*; + + #[test] + fn insert_iter_delete_iter() { + let collector = Collector::new(); + let handle = collector.handle(); + let guard = handle.pin(); + + struct EntryContainer {} + + impl Container for EntryContainer { + fn container_of(entry: *const Entry) -> *const Entry { + entry + } + + fn entry_of(entry: *const Entry) -> *const Entry { + entry + } + + fn finalize(entry: *const Entry) { + unsafe { + drop(Box::from_raw(entry as *mut Entry)) + } + } + } + + let l: List = List::new(); + + let n1 = Owned::new(Entry::default()).into_ptr(&guard); + let n2 = Owned::new(Entry::default()).into_ptr(&guard); + let n3 = Owned::new(Entry::default()).into_ptr(&guard); + + unsafe { + l.insert(n3, &guard); + l.insert(n2, &guard); + l.insert(n1, &guard); + } + + let mut iter = l.iter(&guard); + assert!(iter.next().is_some()); + assert!(iter.next().is_some()); + assert!(iter.next().is_some()); + assert!(iter.next().is_none()); + + unsafe { n2.as_ref().unwrap().delete(&guard); } + + let mut iter = l.iter(&guard); + assert!(iter.next().is_some()); + assert!(iter.next().is_some()); + assert!(iter.next().is_none()); + + unsafe { + n1.as_ref().unwrap().delete(&guard); + n3.as_ref().unwrap().delete(&guard); + } + + let mut iter = l.iter(&guard); + assert!(iter.next().is_none()); + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index e96327b..f8eb259 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,3 +1,4 @@ //! Synchronization primitives. +pub mod list; pub mod queue; From f4ee6e3da877eb465b024aaa00c46abd8b0ca256 Mon Sep 17 00:00:00 2001 From: Jeehoon Kang Date: Sun, 12 Nov 2017 23:06:19 +0900 Subject: [PATCH 4/5] Change the signature of PhantomData for list::Iter --- src/internal.rs | 7 +++---- src/sync/list.rs | 13 +++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/internal.rs b/src/internal.rs index 67f3551..384f96e 100644 --- a/src/internal.rs +++ b/src/internal.rs @@ -113,10 +113,9 @@ impl Global { // misses and data dependencies. We should experiment with other data structures as well. for local in self.locals.iter(&guard) { match local { - Err(IterError::LostRace) => { - // We lost the race to unlink the thread. Usually that means we should traverse - // the list again from the beginning, but since another thread trying to advance - // the epoch has won the race, we leave the job to that one. + Err(IterError::Stalled) => { + // The iteration is stalled by another thread's iteration. Since that thread + // also tries to advance the epoch, we leave the job to that thread. return global_epoch; } Ok(local) => { diff --git a/src/sync/list.rs b/src/sync/list.rs index 18e92d5..220b62f 100644 --- a/src/sync/list.rs +++ b/src/sync/list.rs @@ -84,7 +84,7 @@ pub struct List> { } /// An auxiliary data for iterating over a linked list. -pub struct Iter<'g, T, C: Container> { +pub struct Iter<'g, T: 'g, C: Container> { /// The guard that protects the iteration. guard: &'g Guard, @@ -95,14 +95,15 @@ pub struct Iter<'g, T, C: Container> { curr: Ptr<'g, Entry>, /// The phantom data for container. - _marker: PhantomData<(T, C)>, + _marker: PhantomData<(&'g T, C)>, } /// An enum for iteration error. #[derive(PartialEq, Debug)] pub enum IterError { - /// Iterator lost a race in deleting an entry by a concurrent iterator. - LostRace, + /// Iterator was stalled by another iterator. Internally, the thread lost a race in deleting a + /// node by a concurrent thread. + Stalled, } impl Default for Entry { @@ -228,9 +229,9 @@ impl<'g, T: 'g, C: Container> Iterator for Iter<'g, T, C> { } Err(succ) => { // We lost the race to delete the entry by a concurrent iterator. Set - // `self.curr` to the updated pointer, and report the lost. + // `self.curr` to the updated pointer, and report that we are stalled. self.curr = succ; - return Some(Err(IterError::LostRace)); + return Some(Err(IterError::Stalled)); } } From 076baa03df6c213bc6270eaf81bec73ee6c4813a Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 20 Nov 2017 13:59:05 +0100 Subject: [PATCH 5/5] Document the implementation of unprotected --- src/guard.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/guard.rs b/src/guard.rs index 33aa09d..c6e4d22 100644 --- a/src/guard.rs +++ b/src/guard.rs @@ -300,6 +300,12 @@ impl Clone for Guard { /// [`defer`]: struct.Guard.html#method.defer #[inline] pub unsafe fn unprotected() -> &'static Guard { + // HACK(stjepang): An unprotected guard is just a `Guard` with its field `local` set to null. + // Since this function returns a `'static` reference to a `Guard`, we must return a reference + // to a global guard. However, it's not possible to create a `static` `Guard` because it does + // not implement `Sync`. To get around the problem, we create a static `usize` initialized to + // zero and then transmute it into a `Guard`. This is safe because `usize` and `Guard` + // (consisting of a single pointer) have the same representation in memory. static UNPROTECTED: usize = 0; &*(&UNPROTECTED as *const _ as *const Guard) }