Skip to content

Commit

Permalink
feat(#1, guard): add a real loom test / add Guard::accelerate
Browse files Browse the repository at this point in the history
  • Loading branch information
wvwwvwwv committed Jul 16, 2024
1 parent bc1177b commit e4f1b6a
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 191 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

1.6.0

* Add `Guard::accelerate`.

1.5.0

* Fix `Guard::epoch` to return the correct epoch value.

1.4.0

* `Epoch` is now a 4-state type (3 -> 4).
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "sdd"
description = "Scalable lock-free delayed memory reclaimer"
documentation = "https://docs.rs/sdd"
version = "1.5.0"
version = "1.6.0"
authors = ["wvwwvwwv <wvwwvwwv@me.com>"]
edition = "2021"
rust-version = "1.65.0"
Expand Down
183 changes: 104 additions & 79 deletions src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use super::augmented::fence as augmented_fence;
use super::augmented::thread_local as augmented_thread_local;
use super::augmented::AtomicPtr as AugmentedAtomicPtr;
use super::augmented::AtomicU8 as AugmentedAtomicU8;
use super::exit_guard::ExitGuard;
use super::hidden::{fence, AtomicPtr, AtomicU8};
use super::{Collectible, Epoch, Guard, Tag};
use super::{Collectible, Epoch, Tag};
use std::ptr::{self, NonNull};
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};

/// [`Collector`] is a garbage collector that reclaims thread-locally unreachable instances
/// when they are globally unreachable.
#[derive(Debug)]
pub(super) struct Collector {
state: AtomicU8,
state: AugmentedAtomicU8,
announcement: Epoch,
next_epoch_update: u8,
has_garbage: bool,
Expand Down Expand Up @@ -55,7 +59,7 @@ impl Collector {
} else {
// What will happen after the fence strictly happens after the fence.
self.state.store(new_epoch.into(), Relaxed);
fence(SeqCst);
augmented_fence(SeqCst);
}
if self.announcement != new_epoch {
self.announcement = new_epoch;
Expand All @@ -76,11 +80,18 @@ impl Collector {
}
}

#[inline]
/// Accelerates garbage collection.
pub(super) fn accelerate(&mut self) {
mark_scan_enforced();
self.next_epoch_update = 0;
}

/// Acknowledges an existing [`Guard`] being dropped.
#[inline]
pub(super) fn end_guard(&mut self) {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(self.state.load(Relaxed), self.announcement.into());
debug_assert_eq!(self.state.load(Relaxed), u8::from(self.announcement));

if self.num_readers == 1 {
if self.next_epoch_update == 0 {
Expand Down Expand Up @@ -168,7 +179,7 @@ impl Collector {
/// Acknowledges a new global epoch.
pub(super) fn epoch_updated(&mut self) {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(self.state.load(Relaxed), self.announcement.into());
debug_assert_eq!(self.state.load(Relaxed), u8::from(self.announcement));

let mut garbage_link = self.next_instance_link.take();
self.next_instance_link = self.previous_instance_link.take();
Expand Down Expand Up @@ -197,47 +208,10 @@ impl Collector {
}
}

/// Allocates a new [`Collector`].
fn alloc() -> *mut Collector {
let boxed = Box::new(Collector {
state: AtomicU8::new(Self::INACTIVE),
announcement: Epoch::default(),
next_epoch_update: Self::CADENCE,
has_garbage: false,
num_readers: 0,
previous_instance_link: None,
current_instance_link: None,
next_instance_link: None,
next_link: AtomicPtr::default(),
link: None,
});
let ptr = Box::into_raw(boxed);
let mut current = global_anchor().load(Relaxed);
loop {
unsafe {
(*ptr)
.next_link
.store(Tag::unset_tag(current).cast_mut(), Relaxed);
}

// It keeps the tag intact.
let tag = Tag::into_tag(current);
let new = Tag::update_tag(ptr, tag).cast_mut();
if let Err(actual) =
global_anchor().compare_exchange_weak(current, new, Release, Relaxed)
{
current = actual;
} else {
break;
}
}
ptr
}

/// Tries to scan the [`Collector`] instances to update the global epoch.
fn try_scan(&mut self) {
pub(super) fn try_scan(&mut self) -> bool {
debug_assert_eq!(self.state.load(Relaxed) & Self::INACTIVE, 0);
debug_assert_eq!(self.state.load(Relaxed), self.announcement.into());
debug_assert_eq!(self.state.load(Relaxed), u8::from(self.announcement));

// Only one thread that acquires the anchor lock is allowed to scan the thread-local
// collectors.
Expand Down Expand Up @@ -322,10 +296,50 @@ impl Collector {

if update_global_epoch {
// It is a new era; a fence is required.
fence(SeqCst);
augmented_fence(SeqCst);
epoch().store(Epoch::from_u8(known_epoch).next().into(), Relaxed);
return true;
}
}

false
}

/// Allocates a new [`Collector`].
fn alloc() -> *mut Collector {
let boxed = Box::new(Collector {
state: AugmentedAtomicU8::new(Self::INACTIVE),
announcement: Epoch::default(),
next_epoch_update: Self::CADENCE,
has_garbage: false,
num_readers: 0,
previous_instance_link: None,
current_instance_link: None,
next_instance_link: None,
next_link: AtomicPtr::default(),
link: None,
});
let ptr = Box::into_raw(boxed);
let mut current = global_anchor().load(Relaxed);
loop {
unsafe {
(*ptr)
.next_link
.store(Tag::unset_tag(current).cast_mut(), Relaxed);
}

// It keeps the tag intact.
let tag = Tag::into_tag(current);
let new = Tag::update_tag(ptr, tag).cast_mut();
if let Err(actual) =
global_anchor().compare_exchange_weak(current, new, Release, Relaxed)
{
current = actual;
} else {
break;
}
}
ptr
}
}

Expand Down Expand Up @@ -367,6 +381,7 @@ impl Drop for CollectorAnchor {
}

/// Marks `ANCHOR` that there is a potentially unreachable `Collector`.
#[inline]
fn mark_scan_enforced() {
// `Tag::Second` indicates that there is a garbage `Collector`.
let _result = global_anchor().fetch_update(Release, Relaxed, |p| {
Expand All @@ -385,70 +400,80 @@ fn mark_scan_enforced() {
///
/// The function is safe to call only when the thread is being joined.
unsafe fn try_drop_local_collector() {
let collector_ptr = LOCAL_COLLECTOR.with(|local_collector| local_collector.load(Relaxed));
if collector_ptr.is_null() {
#[cfg(all(loom, test))]
{
// Access to `loom` types is prohibited outside `loom` models.
return;
}
let mut anchor_ptr = global_anchor().load(Relaxed);
if Tag::into_tag(anchor_ptr) == Tag::Second {
// Another thread was joined before, and has yet to be cleaned up.
let guard = Guard::new_for_drop(collector_ptr);
(*collector_ptr).try_scan();
drop(guard);
anchor_ptr = global_anchor().load(Relaxed);
}
if (*collector_ptr).next_link.load(Relaxed).is_null()
&& ptr::eq(collector_ptr, anchor_ptr)
&& global_anchor()
.compare_exchange(anchor_ptr, ptr::null_mut(), Relaxed, Relaxed)
.is_ok()
#[cfg(not(all(loom, test)))]
{
// If it is the head, and the only `Collector` in the global list, drop it here.
while (*collector_ptr).has_garbage {
let guard = Guard::new_for_drop(collector_ptr);
(*collector_ptr).epoch_updated();
let collector_ptr = LOCAL_COLLECTOR.with(|local_collector| local_collector.load(Relaxed));
if collector_ptr.is_null() {
return;
}
let mut anchor_ptr = global_anchor().load(Relaxed);
if Tag::into_tag(anchor_ptr) == Tag::Second {
// Another thread was joined before, and has yet to be cleaned up.
let guard = super::Guard::new_for_drop(collector_ptr);
(*collector_ptr).try_scan();
drop(guard);
anchor_ptr = global_anchor().load(Relaxed);
}
drop(Box::from_raw(collector_ptr));
return;
if (*collector_ptr).next_link.load(Relaxed).is_null()
&& ptr::eq(collector_ptr, anchor_ptr)
&& global_anchor()
.compare_exchange(anchor_ptr, ptr::null_mut(), Relaxed, Relaxed)
.is_ok()
{
// If it is the head, and the only `Collector` in the global list, drop it here.
while (*collector_ptr).has_garbage {
let guard = super::Guard::new_for_drop(collector_ptr);
(*collector_ptr).epoch_updated();
drop(guard);
}
drop(Box::from_raw(collector_ptr));
return;
}
(*collector_ptr).state.fetch_or(Collector::INVALID, Release);
mark_scan_enforced();
}
(*collector_ptr).state.fetch_or(Collector::INVALID, Release);
mark_scan_enforced();
}

thread_local! {
static COLLECTOR_ANCHOR: CollectorAnchor = const { CollectorAnchor };
augmented_thread_local! {
#[allow(clippy::thread_local_initializer_can_be_made_const)]
static COLLECTOR_ANCHOR: CollectorAnchor = CollectorAnchor;
static LOCAL_COLLECTOR: AtomicPtr<Collector> = AtomicPtr::default();
}

/// The global epoch.
///
/// The global epoch can have one of 0, 1, or 2, and a difference in the local announcement of
/// a thread and the global is considered to be an epoch change to the thread.
fn epoch() -> &'static AtomicU8 {
fn epoch() -> &'static AugmentedAtomicU8 {
#[cfg(not(all(loom, test)))]
{
static EPOCH: AtomicU8 = AtomicU8::new(0);
static EPOCH: AugmentedAtomicU8 = AugmentedAtomicU8::new(0);
&EPOCH
}
#[cfg(all(loom, test))]
{
static EPOCH: std::sync::OnceLock<AtomicU8> = std::sync::OnceLock::new();
EPOCH.get_or_init(|| AtomicU8::new(0))
static EPOCH: std::sync::OnceLock<AugmentedAtomicU8> = std::sync::OnceLock::new();
EPOCH.get_or_init(|| AugmentedAtomicU8::new(0))
}
}

/// The global anchor for thread-local instances of [`Collector`].
fn global_anchor() -> &'static AtomicPtr<Collector> {
fn global_anchor() -> &'static AugmentedAtomicPtr<Collector> {
#[cfg(not(all(loom, test)))]
{
static GLOBAL_ANCHOR: AtomicPtr<Collector> = AtomicPtr::new(ptr::null_mut());
static GLOBAL_ANCHOR: AugmentedAtomicPtr<Collector> =
AugmentedAtomicPtr::new(ptr::null_mut());
&GLOBAL_ANCHOR
}
#[cfg(all(loom, test))]
{
static GLOBAL_ANCHOR: std::sync::OnceLock<AtomicPtr<Collector>> =
static GLOBAL_ANCHOR: std::sync::OnceLock<AugmentedAtomicPtr<Collector>> =
std::sync::OnceLock::new();
GLOBAL_ANCHOR.get_or_init(|| AtomicPtr::new(ptr::null_mut()))
GLOBAL_ANCHOR.get_or_init(|| AugmentedAtomicPtr::new(ptr::null_mut()))
}
}
23 changes: 23 additions & 0 deletions src/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ impl Guard {
Collector::current_epoch()
}

/// Forces the [`Guard`] to try to start a new epoch when it is dropped.
///
/// # Examples
///
/// ```
/// use sdd::Guard;
///
/// let guard = Guard::new();
///
/// let epoch = guard.epoch();
/// guard.accelerate();
///
/// drop(guard);
///
/// assert_ne!(epoch, Guard::new().epoch());
/// ```
#[inline]
pub fn accelerate(&self) {
unsafe {
(*self.collector_ptr).accelerate();
}
}

/// Defers dropping and memory reclamation of the supplied [`Box`] of a type implementing
/// [`Collectible`].
///
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ mod ref_counted;
/// ```
#[inline]
pub fn prepare() {
// TODO: this needs to incorporate `allocator_api`.
let guard = Guard::new();
drop(guard);
}
Expand Down Expand Up @@ -90,13 +89,15 @@ pub fn suspend() -> bool {
}

#[cfg(not(all(test, loom)))]
mod hidden {
mod augmented {
pub(crate) use std::sync::atomic::{fence, AtomicPtr, AtomicU8};
pub(crate) use std::thread_local;
}

#[cfg(all(test, loom))]
mod hidden {
mod augmented {
pub(crate) use loom::sync::atomic::{fence, AtomicPtr, AtomicU8};
pub(crate) use loom::thread_local;
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit e4f1b6a

Please sign in to comment.