Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync ops for queue #899

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 249 additions & 83 deletions crossbeam-queue/src/array_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use alloc::boxed::Box;
use core::cell::UnsafeCell;
use core::fmt;
use core::mem::MaybeUninit;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::sync::atomic::{self, AtomicUsize, Ordering};

use crossbeam_utils::{Backoff, CachePadded};
Expand Down Expand Up @@ -91,35 +91,32 @@ impl<T> ArrayQueue<T> {
/// let q = ArrayQueue::<i32>::new(100);
/// ```
pub fn new(cap: usize) -> ArrayQueue<T> {
assert!(cap > 0, "capacity must be non-zero");
let mut new = ArrayQueue {
buffer: Box::new([]),
cap: 0,
one_lap: 1,
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
};
new.resize(cap);
new
}

// Head is initialized to `{ lap: 0, index: 0 }`.
// Tail is initialized to `{ lap: 0, index: 0 }`.
let head = 0;
let tail = 0;
fn index(&self, stamp: usize) -> (usize, usize) {
// Deconstruct the tail.
let index = stamp & (self.one_lap - 1);
let lap = stamp & !(self.one_lap - 1);

// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer: Box<[Slot<T>]> = (0..cap)
.map(|i| {
// Set the stamp to `{ lap: 0, index: i }`.
Slot {
stamp: AtomicUsize::new(i),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
})
.collect();

// One lap is the smallest power of two greater than `cap`.
let one_lap = (cap + 1).next_power_of_two();

ArrayQueue {
buffer,
cap,
one_lap,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
}
let new = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
stamp + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};
(index, new)
}

fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
Expand All @@ -130,19 +127,7 @@ impl<T> ArrayQueue<T> {
let mut tail = self.tail.load(Ordering::Relaxed);

loop {
// Deconstruct the tail.
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);

let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};
let (index, new_tail) = self.index(tail);

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
Expand Down Expand Up @@ -277,9 +262,7 @@ impl<T> ArrayQueue<T> {
let mut head = self.head.load(Ordering::Relaxed);

loop {
// Deconstruct the head.
let index = head & (self.one_lap - 1);
let lap = head & !(self.one_lap - 1);
let (index, new_head) = self.index(head);

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
Expand All @@ -288,20 +271,10 @@ impl<T> ArrayQueue<T> {

// If the the stamp is ahead of the head by 1, we may attempt to pop.
if head + 1 == stamp {
let new = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(self.one_lap)
};

// Try moving the head.
match self.head.compare_exchange_weak(
head,
new,
new_head,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Expand Down Expand Up @@ -336,6 +309,226 @@ impl<T> ArrayQueue<T> {
}
}

fn try_push_sync(&mut self, value: T) -> Result<(), (T, usize, usize)> {
let tail = *self.tail.get_mut();
let (index, new_tail) = self.index(tail);

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked_mut(index) };
let stamp = *slot.stamp.get_mut();

// If the tail and the stamp match, we may push.
if tail == stamp {
*self.tail.get_mut() = new_tail;
unsafe { slot.value.get().write(MaybeUninit::new(value)) }
*slot.stamp.get_mut() = tail + 1;
Ok(())
} else {
Err((value, new_tail, index))
}
}

/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let mut q = ArrayQueue::new(1);
///
/// assert_eq!(q.push_sync(10), Ok(()));
/// assert_eq!(q.push_sync(20), Err(20));
/// ```
pub fn push_sync(&mut self, value: T) -> Result<(), T> {
self.try_push_sync(value).map_err(|(value, ..)| value)
}

/// Pushes an element into the queue, replacing the oldest element if necessary.
///
/// If the queue is full, the oldest element is replaced and returned,
/// otherwise `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let mut q = ArrayQueue::new(2);
///
/// assert_eq!(q.force_push_sync(10), None);
/// assert_eq!(q.force_push_sync(20), None);
/// assert_eq!(q.force_push_sync(30), Some(10));
/// assert_eq!(q.pop_sync(), Some(20));
/// ```
pub fn force_push_sync(&mut self, value: T) -> Option<T> {
match self.try_push_sync(value) {
Ok(()) => None,
Err((v, new_tail, index)) => {
// move the head and tail
*self.head.get_mut() = new_tail.wrapping_sub(self.one_lap);
let tail = *self.tail.get_mut();
*self.tail.get_mut() = new_tail;

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked_mut(index) };

// Swap the previous value.
let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };

// Update the stamp.
*slot.stamp.get_mut() = tail + 1;

Some(old)
}
}
}

/// Attempts to pop an element from the queue.
///
/// If the queue is empty, `None` is returned.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let mut q = ArrayQueue::new(1);
/// assert_eq!(q.push_sync(10), Ok(()));
///
/// assert_eq!(q.pop_sync(), Some(10));
/// assert!(q.pop_sync().is_none());
/// ```
pub fn pop_sync(&mut self) -> Option<T> {
let head = *self.head.get_mut();
if head != *self.tail.get_mut() {
let (index, new_head) = self.index(head);

// update head
*self.head.get_mut() = new_head;

// SAFETY: We have mutable access to this, so we can read without
// worrying about concurrency. Furthermore, we know this is
// initialized because it is the value pointed at by `value.head`
// and this is a non-empty queue.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked_mut(index) };

// get value
let val = unsafe { slot.value.get().read().assume_init() };

// update stamp
*slot.stamp.get_mut() = head.wrapping_add(self.one_lap);
Option::Some(val)
} else {
Option::None
}
}

/// Resets the buffer such that all the laps are set to 0 and the head is at index 0
fn reset(&mut self) {
let tail = self.tail.get_mut();
let head = self.head.get_mut();

let hix = *head & (self.one_lap - 1);
let tix = *tail & (self.one_lap - 1);

// rotate head to the very front
self.buffer.rotate_left(hix);

// reset all the slot stamps to `{ lap: 0, index: i }`
for (i, slot) in self.buffer.iter_mut().enumerate() {
*slot.stamp.get_mut() = i;
}

// get the new stamps for the head and tail
*head = 0;
*tail = if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if *tail == *head {
0
} else {
self.cap
};
}

/// Resizes the [`ArrayQueue`] to support more entries.
///
/// # Examples
///
/// ```
/// use crossbeam_queue::ArrayQueue;
///
/// let mut q = ArrayQueue::new(1);
/// assert_eq!(q.push_sync(10), Ok(()));
/// // not enough space
/// assert_eq!(q.push_sync(20), Err(20));
///
/// q.resize(2);
/// assert_eq!(q.push_sync(20), Ok(()));
/// assert_eq!(q.pop_sync(), Some(10));
///
/// q.resize(1);
/// assert_eq!(q.pop_sync(), Some(20));
/// ```
pub fn resize(&mut self, cap: usize) {
assert!(cap > 0, "capacity must be non-zero");

self.reset();
let tail = self.tail.get_mut();
self.one_lap = (cap + 1).next_power_of_two();
self.cap = cap;

// get our buffer as a vec (so we can resize it). Replacing it with an empty slice for now
let mut v = core::mem::replace(&mut self.buffer, Box::new([])).into_vec();

// if we want more space, reserve it and initialise
// else, truncate and shrink
if cap > v.len() {
// reserve_exact to optimise the `into_boxed_slice` call later
v.reserve_exact(cap - v.len());
let mut stamp = v.len();
v.resize_with(cap, || {
let i = stamp;
stamp += 1;
Slot {
// Set the stamp to `{ lap: 0, index: i }`.
stamp: AtomicUsize::new(i),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
});
} else {
if *tail > cap {
// drop values
unsafe {
for slot in v.get_unchecked_mut(cap..*tail) {
(*slot.value.get()).as_mut_ptr().drop_in_place();
}
}
*tail = self.one_lap; // tail wraps around
}

v.truncate(cap);
v.shrink_to_fit();
}

// we have used `reserve_exact` and `shrink_to_fit` which guarantee to call
// the allocator with the `cap` we gave it.
// This means that we are safe to give that memory to a box, because we can dealloc using that same cap
let ptr = ManuallyDrop::new(v).as_mut_ptr();
let b = unsafe { Box::from_raw(core::slice::from_raw_parts_mut(ptr, cap)) };

// move our new box back into the buffer, leaking the placeholder
// (LLVM will probably assume it's not-empty)
Box::leak(core::mem::replace(&mut self.buffer, b));
}

/// Returns the capacity of the queue.
///
/// # Examples
Expand Down Expand Up @@ -504,33 +697,6 @@ impl<T> Iterator for IntoIter<T> {
type Item = T;

fn next(&mut self) -> Option<Self::Item> {
let value = &mut self.value;
let head = *value.head.get_mut();
if value.head.get_mut() != value.tail.get_mut() {
let index = head & (value.one_lap - 1);
let lap = head & !(value.one_lap - 1);
// SAFETY: We have mutable access to this, so we can read without
// worrying about concurrency. Furthermore, we know this is
// initialized because it is the value pointed at by `value.head`
// and this is a non-empty queue.
let val = unsafe {
debug_assert!(index < value.buffer.len());
let slot = value.buffer.get_unchecked_mut(index);
slot.value.get().read().assume_init()
};
let new = if index + 1 < value.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), index: 0 }`.
lap.wrapping_add(value.one_lap)
};
*value.head.get_mut() = new;
Option::Some(val)
} else {
Option::None
}
self.value.pop_sync()
}
}
Loading