Skip to content

Commit

Permalink
util/epoch: Add preliminary support for loom
Browse files Browse the repository at this point in the history
This patch only adds support to parts of `utils` and to `epoch`. Some
parts of `utils` had to be left out, since they rely on
`AtomicUsize::new` being `const` (which it is not in `loom`). Other
parts had to be left out due to the lack of `thread::Thread` in `loom`.
All the parts needed for `epoch` were successfully moved to loom.

For this initial patch, there are two loom tests, both in `epoch`. One
is a simple test of defer_destroy while a pin is held, and the other is
the Triber stack example. They both pass loom with
`LOOM_MAX_PREEMPTIONS=3` and `LOOM_MAX_PREEMPTIONS=2`. The latter tests
fewer possible interleavings, but completes in 13 minutes on my laptop
rather than ~2 hours. I have added loom testing of `epoch` to CI as
well.

The minimal version bump to 1.30 is a little awkward, but is needed to
allow us to use paths for macro imports. Now, technically we already
rely on MSRV above 1.30 for a bunch of features (like `alloc`), but this
is a change that affects all of `crossbeam-epoch`.

Note that the uses of `UnsafeCell` in `utils` have not been moved to
`loom::cell::UnsafeCell`, as loom's `UnsafeCell` does not support `T:
?Sized`, which `AtomicCell` depends on.

Fixes #486.
  • Loading branch information
jonhoo committed May 19, 2020
1 parent de59f9b commit 9dc2226
Show file tree
Hide file tree
Showing 22 changed files with 381 additions and 47 deletions.
15 changes: 13 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
- crossbeam-skiplist
- crossbeam-utils
rust:
- 1.28.0
- 1.30.0
- nightly
steps:
- uses: actions/checkout@master
Expand All @@ -38,7 +38,7 @@ jobs:
rustup target add thumbv6m-none-eabi
# cfg-if 0.1.10 requires Rust 1.31+ so downgrade it.
- name: Downgrade dependencies
if: matrix.rust == '1.28.0'
if: matrix.rust == '1.30.0'
run: |
cargo generate-lockfile
cargo update -p cfg-if --precise 0.1.9
Expand Down Expand Up @@ -66,3 +66,14 @@ jobs:
run: rustup update stable && rustup default stable
- name: rustfmt
run: ./ci/rustfmt.sh

# Run loom tests.
loom:
name: loom
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Install Rust
run: rustup update stable && rustup default stable
- name: loom
run: ./ci/crossbeam-epoch-loom.sh
8 changes: 8 additions & 0 deletions ci/crossbeam-epoch-loom.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

cd "$(dirname "$0")"/../crossbeam-epoch
set -ex

export RUSTFLAGS="-D warnings --cfg=loom"

env LOOM_MAX_PREEMPTIONS=2 cargo test --test loom --features sanitize --release -- --nocapture
3 changes: 3 additions & 0 deletions crossbeam-epoch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ cfg-if = "0.1.2"
maybe-uninit = "2.0.0"
memoffset = "0.5"

[target.'cfg(loom)'.dependencies]
loom = "0.3.2"

[dependencies.crossbeam-utils]
version = "0.7"
path = "../crossbeam-utils"
Expand Down
17 changes: 12 additions & 5 deletions crossbeam-epoch/src/atomic.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use alloc::boxed::Box;
use concurrency::sync::atomic::{AtomicUsize, Ordering};
use core::borrow::{Borrow, BorrowMut};
use core::cmp;
use core::fmt;
use core::marker::PhantomData;
use core::mem;
use core::ops::{Deref, DerefMut};
use core::ptr;
use core::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_utils::atomic::AtomicConsume;
use guard::Guard;
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<T> Atomic<T> {
///
/// let a = Atomic::<i32>::null();
/// ```
#[cfg(not(has_min_const_fn))]
#[cfg(any(loom, not(has_min_const_fn)))]
pub fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
Expand All @@ -167,7 +167,7 @@ impl<T> Atomic<T> {
///
/// let a = Atomic::<i32>::null();
/// ```
#[cfg(has_min_const_fn)]
#[cfg(all(not(loom), has_min_const_fn))]
pub const fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
Expand Down Expand Up @@ -506,7 +506,14 @@ impl<T> Atomic<T> {
/// }
/// ```
pub unsafe fn into_owned(self) -> Owned<T> {
Owned::from_usize(self.data.into_inner())
#[cfg(loom)]
{
Owned::from_usize(self.data.unsync_load())
}
#[cfg(not(loom))]
{
Owned::from_usize(self.data.into_inner())
}
}
}

Expand Down Expand Up @@ -1185,7 +1192,7 @@ impl<'g, T> Default for Shared<'g, T> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use super::Shared;

Expand Down
10 changes: 5 additions & 5 deletions crossbeam-epoch/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
///
/// handle.pin().flush();
/// ```
use alloc::sync::Arc;
use concurrency::sync::Arc;
use core::fmt;

use guard::Guard;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl fmt::Debug for LocalHandle {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -145,9 +145,9 @@ mod tests {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);

assert!(!(*(*guard.local).bag.get()).is_empty());
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));

while !(*(*guard.local).bag.get()).is_empty() {
while !(*guard.local).bag.with(|b| (*b).is_empty()) {
guard.flush();
}
}
Expand All @@ -166,7 +166,7 @@ mod tests {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);
}
assert!(!(*(*guard.local).bag.get()).is_empty());
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
}
}

Expand Down
3 changes: 2 additions & 1 deletion crossbeam-epoch/src/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! destructed on thread exit, which in turn unregisters the thread.
use collector::{Collector, LocalHandle};
use concurrency::{lazy_static, thread_local};
use guard::Guard;

lazy_static! {
Expand Down Expand Up @@ -44,7 +45,7 @@ where
.unwrap_or_else(|_| f(&COLLECTOR.register()))
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use crossbeam_utils::thread;

Expand Down
2 changes: 1 addition & 1 deletion crossbeam-epoch/src/deferred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Deferred {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use super::Deferred;
use std::cell::Cell;
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-epoch/src/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! If an object became garbage in some epoch, then we can be sure that after two advancements no
//! participant will hold a reference to it. That is the crux of safe memory reclamation.
use core::sync::atomic::{AtomicUsize, Ordering};
use concurrency::sync::atomic::{AtomicUsize, Ordering};

/// An epoch that can be marked as pinned or unpinned.
///
Expand Down
20 changes: 11 additions & 9 deletions crossbeam-epoch/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
//! Ideally each instance of concurrent data structure may have its own queue that gets fully
//! destroyed as soon as the data structure gets dropped.
use core::cell::{Cell, UnsafeCell};
use concurrency::cell::UnsafeCell;
use concurrency::sync::atomic;
use concurrency::sync::atomic::Ordering;
use core::cell::Cell;
use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
use core::sync::atomic;
use core::sync::atomic::Ordering;
use core::{fmt, ptr};

use crossbeam_utils::CachePadded;
Expand Down Expand Up @@ -411,7 +412,7 @@ impl Local {
/// Returns a reference to the `Collector` in which this `Local` resides.
#[inline]
pub fn collector(&self) -> &Collector {
unsafe { &**self.collector.get() }
self.collector.with(|c| unsafe { &**c })
}

/// Returns `true` if the current participant is pinned.
Expand All @@ -426,7 +427,7 @@ impl Local {
///
/// It should be safe for another thread to execute the given function.
pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
let bag = &mut *self.bag.get();
let bag = self.bag.with_mut(|b| &mut *b);

while let Err(d) = bag.try_push(deferred) {
self.global().push_bag(bag, guard);
Expand All @@ -435,7 +436,7 @@ impl Local {
}

pub fn flush(&self, guard: &Guard) {
let bag = unsafe { &mut *self.bag.get() };
let bag = self.bag.with_mut(|b| unsafe { &mut *b });

if !bag.is_empty() {
self.global().push_bag(bag, guard);
Expand Down Expand Up @@ -573,7 +574,8 @@ impl Local {
// Pin and move the local bag into the global queue. It's important that `push_bag`
// doesn't defer destruction on any new garbage.
let guard = &self.pin();
self.global().push_bag(&mut *self.bag.get(), guard);
self.global()
.push_bag(self.bag.with_mut(|b| &mut *b), guard);
}
// Revert the handle count back to zero.
self.handle_count.set(0);
Expand All @@ -582,7 +584,7 @@ impl Local {
// Take the reference to the `Global` out of this `Local`. Since we're not protected
// by a guard at this time, it's crucial that the reference is read before marking the
// `Local` as deleted.
let collector: Collector = ptr::read(&*(*self.collector.get()));
let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));

// Mark this node in the linked list as deleted.
self.entry.delete(&unprotected());
Expand Down Expand Up @@ -613,7 +615,7 @@ impl IsElement<Local> for Local {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

Expand Down
82 changes: 81 additions & 1 deletion crossbeam-epoch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,87 @@ extern crate core;

extern crate maybe_uninit;

#[cfg(loom)]
extern crate loom;

#[cfg(loom)]
#[allow(unused_imports, dead_code)]
pub(crate) mod concurrency {
pub(crate) mod cell {
pub(crate) use loom::cell::UnsafeCell;
}
pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use loom::sync::atomic::{AtomicUsize, Ordering};
pub(crate) fn fence(ord: Ordering) {
if let Ordering::Acquire = ord {
} else {
// FIXME: loom only supports acquire fences at the moment.
// https://github.com/tokio-rs/loom/issues/117
// let's at least not panic...
// this may generate some false positives (`SeqCst` is stronger than `Acquire`
// for example), and some false negatives (`Relaxed` is weaker than `Acquire`),
// but it's the best we can do for the time being.
}
loom::sync::atomic::fence(Ordering::Acquire)
}

// FIXME: loom does not support compiler_fence at the moment.
// https://github.com/tokio-rs/loom/issues/117
// we use fence as a stand-in for compiler_fence for the time being.
// this may miss some races since fence is stronger than compiler_fence,
// but it's the best we can do for the time being.
pub(crate) use self::fence as compiler_fence;
}
pub(crate) use loom::sync::Arc;
}
pub(crate) use loom::lazy_static;
pub(crate) use loom::thread_local;
}
#[cfg(not(loom))]
#[allow(unused_imports, dead_code)]
pub(crate) mod concurrency {
#[cfg(any(feature = "alloc", feature = "std"))]
pub(crate) mod cell {
#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct UnsafeCell<T>(::core::cell::UnsafeCell<T>);

impl<T> UnsafeCell<T> {
#[inline]
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(::core::cell::UnsafeCell::new(data))
}

#[inline]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}

#[inline]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
}
}
#[cfg(any(feature = "alloc", feature = "std"))]
pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use core::sync::atomic::compiler_fence;
pub(crate) use core::sync::atomic::fence;
pub(crate) use core::sync::atomic::{AtomicUsize, Ordering};
}
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
pub(crate) use alloc::sync::Arc;
}

#[cfg(feature = "std")]
pub(crate) use std::thread_local;

#[cfg(feature = "std")]
pub(crate) use lazy_static::lazy_static;
}

cfg_if! {
if #[cfg(feature = "alloc")] {
extern crate alloc;
Expand Down Expand Up @@ -99,7 +180,6 @@ cfg_if! {

cfg_if! {
if #[cfg(feature = "std")] {
#[macro_use]
extern crate lazy_static;

mod default;
Expand Down
4 changes: 2 additions & 2 deletions crossbeam-epoch/src/sync/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
//! Ideas from Michael. High Performance Dynamic Lock-Free Hash Tables and List-Based Sets. SPAA
//! 2002. http://dl.acm.org/citation.cfm?id=564870.564881
use concurrency::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use core::marker::PhantomData;
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};

use {unprotected, Atomic, Guard, Shared};

Expand Down Expand Up @@ -295,7 +295,7 @@ impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crossbeam_utils::thread;
Expand Down
4 changes: 2 additions & 2 deletions crossbeam-epoch/src/sync/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! Simon Doherty, Lindsay Groves, Victor Luchangco, and Mark Moir. 2004b. Formal Verification of a
//! Practical Lock-Free Queue Algorithm. https://doi.org/10.1007/978-3-540-30232-2_7
use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use concurrency::sync::atomic::Ordering::{Acquire, Relaxed, Release};

use crossbeam_utils::CachePadded;

Expand Down Expand Up @@ -203,7 +203,7 @@ impl<T> Drop for Queue<T> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
use crossbeam_utils::thread;
Expand Down
Loading

0 comments on commit 9dc2226

Please sign in to comment.