Skip to content

Commit

Permalink
Merge #789 #797
Browse files Browse the repository at this point in the history
789: Add fn force_push to ArrayQueue  r=taiki-e a=brunocodutra

This is an attempt to implement a straightforward MPMC ring-buffer and close #680.

This proposal adds a new method `push_or_swap` to `ArrayQueue`, that atomically swaps the oldest element when the queue is full, instead of returning `Err` back to the caller like `push` does. As such, `push_or_swap` never fails to insert the element into the queue.

I couldn't find any benchmarks I could run, (am I missing anything obvious?), however I did run benchmarks from [ring-channel](https://github.com/brunocodutra/ring-channel) where I compared this implementation against an emulation of a ring-buffer, that keeps popping elements until pushing succeeds, i.e. something like the following:


```
while let Err(v) = q.push(value) {
   q.pop();
   value = v;
}
```

I got the results below on my machine, which show that `push_or_swap` fares much better when capacity is low and the probability that pushing fails is high (the baseline was set to the `push`-based implementation).


* **Note 1:** the relevant metric in the benchmarks below is the throughput, which is scaled by the "channel efficiency", defined as `total_number_of_messages_received / total_number_of_messages_sent`.
* **Note 2:** benchmark names are suffixed by `SB/PxR/C`, where S is the size of the element in bytes, P is the number of threads producing, R the number of threads consuming, and C is the capacity of the ring-buffer: 
* **Note 3:** the source code for the benchmarks is [available here](https://github.com/brunocodutra/ring-channel/blob/master/benches/throughput.rs#L18-L48).

```
Benchmarking mpmc/Block/32B/8x8/1: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 7.2s, enable flat sampling, or reduce sample count to 50.
mpmc/Block/32B/8x8/1    time:   [1.3466 ms 1.3982 ms 1.4507 ms]                                  
                        thrpt:  [1.4117 Melem/s 1.4647 Melem/s 1.5209 Melem/s]
                 change:
                        time:   [-33.037% -28.797% -24.494%] (p = 0.00 < 0.05)
                        thrpt:  [+32.440% +40.443% +49.337%]
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
mpmc/Block/32B/8x8/16   time:   [367.57 us 374.55 us 382.12 us]                                  
                        thrpt:  [5.3596 Melem/s 5.4679 Melem/s 5.5717 Melem/s]
                 change:
                        time:   [-2.1237% +0.3288% +2.6459%] (p = 0.79 > 0.05)
                        thrpt:  [-2.5777% -0.3277% +2.1698%]
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

Benchmarking mpsc/Block/32B/15x1/1: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.9s, enable flat sampling, or reduce sample count to 50.
mpsc/Block/32B/15x1/1   time:   [3.5292 ms 3.7286 ms 3.9535 ms]                                   
                        thrpt:  [971.28 Kelem/s 1.0299 Melem/s 1.0881 Melem/s]
                 change:
                        time:   [-51.773% -43.940% -34.318%] (p = 0.00 < 0.05)
                        thrpt:  [+52.248% +78.380% +107.35%]
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe
mpsc/Block/32B/15x1/16  time:   [853.29 us 873.07 us 895.27 us]                                   
                        thrpt:  [4.2892 Melem/s 4.3983 Melem/s 4.5003 Melem/s]
                 change:
                        time:   [-8.3188% +0.1727% +9.3995%] (p = 0.97 > 0.05)
                        thrpt:  [-8.5919% -0.1724% +9.0736%]
                        No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe

spmc/Block/32B/1x15/1   time:   [163.94 us 169.05 us 173.89 us]                                  
                        thrpt:  [1.4722 Melem/s 1.5144 Melem/s 1.5616 Melem/s]
                 change:
                        time:   [-6.0575% -1.4457% +3.5710%] (p = 0.55 > 0.05)
                        thrpt:  [-3.4479% +1.4669% +6.4481%]
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  6 (6.00%) low mild
  1 (1.00%) high mild
spmc/Block/32B/1x15/16  time:   [49.955 us 53.012 us 56.021 us]                                    
                        thrpt:  [4.5697 Melem/s 4.8291 Melem/s 5.1246 Melem/s]
                 change:
                        time:   [-9.8603% -3.6168% +3.6703%] (p = 0.31 > 0.05)
                        thrpt:  [-3.5403% +3.7526% +10.939%]
                        No change in performance detected.

spsc/Block/32B/1x1/1    time:   [92.707 us 98.294 us 103.02 us]                                 
                        thrpt:  [2.4851 Melem/s 2.6044 Melem/s 2.7614 Melem/s]
                 change:
                        time:   [-13.073% -5.2960% +2.5130%] (p = 0.21 > 0.05)
                        thrpt:  [-2.4514% +5.5922% +15.039%]
                        No change in performance detected.
spsc/Block/32B/1x1/2    time:   [79.525 us 87.271 us 94.110 us]                                 
                        thrpt:  [2.7202 Melem/s 2.9334 Melem/s 3.2191 Melem/s]
                 change:
                        time:   [-18.141% -8.7754% +0.3419%] (p = 0.07 > 0.05)
                        thrpt:  [-0.3407% +9.6196% +22.162%]
                        No change in performance detected.
```

797: Update to stabilized const_fn_trait_bound r=taiki-e a=taiki-e

const_fn_trait_bound has been stabilized on Rust 1.61.

Co-authored-by: Bruno Dutra <brunocodutra@gmail.com>
Co-authored-by: Taiki Endo <te316e89@gmail.com>
  • Loading branch information
3 people authored Mar 9, 2022
3 parents b11f1a8 + bd75c3c + 7b2d65f commit 0081fcc
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 42 deletions.
9 changes: 7 additions & 2 deletions crossbeam-epoch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,26 @@ std = ["alloc", "crossbeam-utils/std", "lazy_static"]
# NOTE: Disabling both `std` *and* `alloc` features is not supported yet.
alloc = []

# These features are no longer used.
# TODO: remove in the next major version.
# Enable to use of unstable functionality.
# This is disabled by default and requires recent nightly compiler.
#
# NOTE: This feature is outside of the normal semver guarantees and minor or
# patch versions of crossbeam may make breaking changes to them at any time.
nightly = ["crossbeam-utils/nightly", "const_fn"]
nightly = ["crossbeam-utils/nightly"]

# Enable the use of loom for concurrency testing.
#
# NOTE: This feature is outside of the normal semver guarantees and minor or
# patch versions of crossbeam may make breaking changes to them at any time.
loom = ["loom-crate", "crossbeam-utils/loom"]

[build-dependencies]
autocfg = "1"

[dependencies]
cfg-if = "1"
const_fn = { version = "0.4.4", optional = true }
memoffset = "0.6"

# Enable the use of loom for concurrency testing.
Expand All @@ -67,3 +71,4 @@ default-features = false

[dev-dependencies]
rand = "0.8"
rustversion = "1"
17 changes: 17 additions & 0 deletions crossbeam-epoch/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ fn main() {
}
};

let cfg = match autocfg::AutoCfg::new() {
Ok(cfg) => cfg,
Err(e) => {
println!(
"cargo:warning={}: unable to determine rustc version: {}",
env!("CARGO_PKG_NAME"),
e
);
return;
}
};

// Note that this is `no_*`, not `has_*`. This allows treating
// `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
// run. This is needed for compatibility with non-cargo build systems that
Expand All @@ -37,5 +49,10 @@ fn main() {
println!("cargo:rustc-cfg=crossbeam_no_atomic_cas");
}

if cfg.probe_rustc_version(1, 61) {
// TODO: invert cfg once Rust 1.61 became stable.
println!("cargo:rustc-cfg=crossbeam_const_fn_trait_bound");
}

println!("cargo:rerun-if-changed=no_atomic.rs");
}
14 changes: 11 additions & 3 deletions crossbeam-epoch/src/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,16 @@ impl<T: ?Sized + Pointable> Atomic<T> {
///
/// let a = Atomic::<i32>::null();
/// ```
///
#[cfg_attr(all(feature = "nightly", not(crossbeam_loom)), const_fn::const_fn)]
#[cfg(all(crossbeam_const_fn_trait_bound, not(crossbeam_loom)))]
pub const fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
_marker: PhantomData,
}
}

/// Returns a new null atomic pointer.
#[cfg(not(all(crossbeam_const_fn_trait_bound, not(crossbeam_loom))))]
pub fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
Expand Down Expand Up @@ -1594,7 +1602,7 @@ mod tests {
Shared::<i64>::null().with_tag(7);
}

#[cfg(feature = "nightly")]
#[rustversion::since(1.61)]
#[test]
fn const_atomic_null() {
use super::Atomic;
Expand Down
1 change: 0 additions & 1 deletion crossbeam-epoch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
unreachable_pub
)]
#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(feature = "nightly", feature(const_fn_trait_bound))]

#[cfg(crossbeam_loom)]
extern crate loom_crate as loom;
Expand Down
127 changes: 92 additions & 35 deletions crossbeam-queue/src/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ struct Slot<T> {
///
/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
/// faster than [`SegQueue`].
/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
/// a bit faster than [`SegQueue`].
///
/// [`force_push`]: ArrayQueue::force_push
/// [`SegQueue`]: super::SegQueue
///
/// # Examples
Expand Down Expand Up @@ -120,21 +122,10 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(1);
///
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.push(20), Err(20));
/// ```
pub fn push(&self, value: T) -> Result<(), T> {
fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
where
F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
{
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);

Expand All @@ -143,23 +134,23 @@ impl<T> ArrayQueue<T> {
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);

let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);

// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
Expand All @@ -182,14 +173,7 @@ impl<T> ArrayQueue<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
return Err(value);
}

value = f(value, tail, new_tail, slot)?;
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
Expand All @@ -200,6 +184,79 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(1);
///
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.push(20), Err(20));
/// ```
pub fn push(&self, value: T) -> Result<(), T> {
self.push_or_else(value, |v, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
Err(v)
} else {
Ok(v)
}
})
}

/// Pushes an element into the queue, replacing the oldest element if necessary.
///
/// If the queue is full, the oldest element is replaced and returned,
/// otherwise `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(2);
///
/// assert_eq!(q.force_push(10), None);
/// assert_eq!(q.force_push(20), None);
/// assert_eq!(q.force_push(30), Some(10));
/// assert_eq!(q.pop(), Some(20));
/// ```
pub fn force_push(&self, value: T) -> Option<T> {
self.push_or_else(value, |v, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);

// Try moving the head.
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Move the tail.
self.tail.store(new_tail, Ordering::SeqCst);

// Swap the previous value.
let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };

// Update the stamp.
slot.stamp.store(tail + 1, Ordering::Release);

Err(old)
} else {
Ok(v)
}
})
.err()
}

/// Attempts to pop an element from the queue.
///
/// If the queue is empty, `None` is returned.
Expand Down
93 changes: 92 additions & 1 deletion crossbeam-queue/tests/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,45 @@ fn spsc() {
.unwrap();
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn spsc_ring_buffer() {
const COUNT: usize = 100_000;

let t = AtomicUsize::new(1);
let q = ArrayQueue::<usize>::new(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();

scope(|scope| {
scope.spawn(|_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,

_ => {
while let Some(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
});

scope.spawn(|_| {
for i in 0..COUNT {
if let Some(n) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}

t.fetch_sub(1, Ordering::SeqCst);
});
})
.unwrap();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), 1);
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn mpmc() {
Expand Down Expand Up @@ -181,6 +220,50 @@ fn mpmc() {
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn mpmc_ring_buffer() {
const COUNT: usize = 25_000;
const THREADS: usize = 4;

let t = AtomicUsize::new(THREADS);
let q = ArrayQueue::<usize>::new(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();

scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,

_ => {
while let Some(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
});
}

for _ in 0..THREADS {
scope.spawn(|_| {
for i in 0..COUNT {
if let Some(n) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}

t.fetch_sub(1, Ordering::SeqCst);
});
}
})
.unwrap();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
Expand Down Expand Up @@ -244,13 +327,21 @@ fn linearizable() {
let q = ArrayQueue::new(THREADS);

scope(|scope| {
for _ in 0..THREADS {
for _ in 0..THREADS / 2 {
scope.spawn(|_| {
for _ in 0..COUNT {
while q.push(0).is_err() {}
q.pop().unwrap();
}
});

scope.spawn(|_| {
for _ in 0..COUNT {
if q.force_push(0).is_none() {
q.pop().unwrap();
}
}
});
}
})
.unwrap();
Expand Down

0 comments on commit 0081fcc

Please sign in to comment.