Skip to content

Commit

Permalink
Add force_push to ArrayQueue
Browse files Browse the repository at this point in the history
force_push makes it possible for ArrayQueue to be used as a ring-buffer.
  • Loading branch information
brunocodutra committed Mar 3, 2022
1 parent b11f1a8 commit bd75c3c
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 36 deletions.
127 changes: 92 additions & 35 deletions crossbeam-queue/src/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ struct Slot<T> {
///
/// This queue allocates a fixed-capacity buffer on construction, which is used to store pushed
/// elements. The queue cannot hold more elements than the buffer allows. Attempting to push an
/// element into a full queue will fail. Having a buffer allocated upfront makes this queue a bit
/// faster than [`SegQueue`].
/// element into a full queue will fail. Alternatively, [`force_push`] makes it possible for
/// this queue to be used as a ring-buffer. Having a buffer allocated upfront makes this queue
/// a bit faster than [`SegQueue`].
///
/// [`force_push`]: ArrayQueue::force_push
/// [`SegQueue`]: super::SegQueue
///
/// # Examples
Expand Down Expand Up @@ -120,21 +122,10 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(1);
///
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.push(20), Err(20));
/// ```
pub fn push(&self, value: T) -> Result<(), T> {
fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
where
F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
{
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);

Expand All @@ -143,23 +134,23 @@ impl<T> ArrayQueue<T> {
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);

let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);

// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
Expand All @@ -182,14 +173,7 @@ impl<T> ArrayQueue<T> {
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
return Err(value);
}

value = f(value, tail, new_tail, slot)?;
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
Expand All @@ -200,6 +184,79 @@ impl<T> ArrayQueue<T> {
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(1);
///
/// assert_eq!(q.push(10), Ok(()));
/// assert_eq!(q.push(20), Err(20));
/// ```
pub fn push(&self, value: T) -> Result<(), T> {
self.push_or_else(value, |v, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);

// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the queue is full.
Err(v)
} else {
Ok(v)
}
})
}

/// Pushes an element into the queue, replacing the oldest element if necessary.
///
/// If the queue is full, the oldest element is replaced and returned,
/// otherwise `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let q = ArrayQueue::new(2);
///
/// assert_eq!(q.force_push(10), None);
/// assert_eq!(q.force_push(20), None);
/// assert_eq!(q.force_push(30), Some(10));
/// assert_eq!(q.pop(), Some(20));
/// ```
pub fn force_push(&self, value: T) -> Option<T> {
self.push_or_else(value, |v, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);

// Try moving the head.
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
// Move the tail.
self.tail.store(new_tail, Ordering::SeqCst);

// Swap the previous value.
let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };

// Update the stamp.
slot.stamp.store(tail + 1, Ordering::Release);

Err(old)
} else {
Ok(v)
}
})
.err()
}

/// Attempts to pop an element from the queue.
///
/// If the queue is empty, `None` is returned.
Expand Down
93 changes: 92 additions & 1 deletion crossbeam-queue/tests/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,45 @@ fn spsc() {
.unwrap();
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn spsc_ring_buffer() {
const COUNT: usize = 100_000;

let t = AtomicUsize::new(1);
let q = ArrayQueue::<usize>::new(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();

scope(|scope| {
scope.spawn(|_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,

_ => {
while let Some(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
});

scope.spawn(|_| {
for i in 0..COUNT {
if let Some(n) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}

t.fetch_sub(1, Ordering::SeqCst);
});
})
.unwrap();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), 1);
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn mpmc() {
Expand Down Expand Up @@ -181,6 +220,50 @@ fn mpmc() {
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn mpmc_ring_buffer() {
const COUNT: usize = 25_000;
const THREADS: usize = 4;

let t = AtomicUsize::new(THREADS);
let q = ArrayQueue::<usize>::new(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();

scope(|scope| {
for _ in 0..THREADS {
scope.spawn(|_| loop {
match t.load(Ordering::SeqCst) {
0 if q.is_empty() => break,

_ => {
while let Some(n) = q.pop() {
v[n].fetch_add(1, Ordering::SeqCst);
}
}
}
});
}

for _ in 0..THREADS {
scope.spawn(|_| {
for i in 0..COUNT {
if let Some(n) = q.force_push(i) {
v[n].fetch_add(1, Ordering::SeqCst);
}
}

t.fetch_sub(1, Ordering::SeqCst);
});
}
})
.unwrap();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}

#[cfg_attr(miri, ignore)] // Miri is too slow
#[test]
fn drops() {
Expand Down Expand Up @@ -244,13 +327,21 @@ fn linearizable() {
let q = ArrayQueue::new(THREADS);

scope(|scope| {
for _ in 0..THREADS {
for _ in 0..THREADS / 2 {
scope.spawn(|_| {
for _ in 0..COUNT {
while q.push(0).is_err() {}
q.pop().unwrap();
}
});

scope.spawn(|_| {
for _ in 0..COUNT {
if q.force_push(0).is_none() {
q.pop().unwrap();
}
}
});
}
})
.unwrap();
Expand Down

0 comments on commit bd75c3c

Please sign in to comment.