Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use AtomicU64 for head/tail index in deque, channel, and queues #777

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ default-features = false
optional = true

[dependencies.crossbeam-utils]
version = "0.8.5"
version = "0.8.6"
path = "./crossbeam-utils"
default-features = false

Expand Down
3 changes: 2 additions & 1 deletion ci/no_atomic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ cat >"${file}" <<EOF
// This file is @generated by $(basename "$0").
// It is not intended for manual editing.

#[allow(dead_code)] // Only crossbeam-{epoch,queue,skiplist,utils} use this.
const NO_ATOMIC_CAS: &[&str] = &[
EOF
for target in "${no_atomic_cas[@]}"; do
Expand All @@ -49,7 +50,7 @@ done
cat >>"${file}" <<EOF
];

#[allow(dead_code)] // Only crossbeam-utils uses this.
#[allow(dead_code)] // Only crossbeam-{channel,deque,queue,utils} use this.
const NO_ATOMIC_64: &[&str] = &[
EOF
for target in "${no_atomic_64[@]}"; do
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-channel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ std = ["crossbeam-utils/std"]
cfg-if = "1"

[dependencies.crossbeam-utils]
version = "0.8"
version = "0.8.6"
path = "../crossbeam-utils"
default-features = false
optional = true
Expand Down
43 changes: 43 additions & 0 deletions crossbeam-channel/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// The rustc-cfg listed below are considered public API, but it is *unstable*
// and outside of the normal semver guarantees:
//
// - `crossbeam_no_atomic_64`
// Assume the target does *not* support AtomicU64/AtomicI64.
// This is usually detected automatically by the build script, but you may
// need to enable it manually when building for custom targets or using
// non-cargo build systems that don't run the build script.
//
// With the exceptions mentioned above, the rustc-cfg emitted by the build
// script are *not* public API.

#![warn(rust_2018_idioms)]

use std::env;

include!("no_atomic.rs");

fn main() {
let target = match env::var("TARGET") {
Ok(target) => target,
Err(e) => {
println!(
"cargo:warning={}: unable to get TARGET environment variable: {}",
env!("CARGO_PKG_NAME"),
e
);
return;
}
};

// Note that this is `no_*`, not `has_*`. This allows treating
// "max-atomic-width" as 64 when the build script doesn't run. This is
// needed for compatibility with non-cargo build systems that don't run the
// build script.
if NO_ATOMIC_64.contains(&&*target) {
println!("cargo:rustc-cfg=crossbeam_no_atomic_64");
} else {
// Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`.
}

println!("cargo:rerun-if-changed=no_atomic.rs");
}
1 change: 1 addition & 0 deletions crossbeam-channel/no_atomic.rs
41 changes: 21 additions & 20 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::atomic::{self, Ordering};
use std::time::Instant;

use crossbeam_utils::{Backoff, CachePadded};

use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::utils::AtomicU64;
use crate::waker::SyncWaker;

/// A slot in a channel.
struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
stamp: AtomicU64,

/// The message in this slot.
msg: UnsafeCell<MaybeUninit<T>>,
Expand All @@ -37,7 +38,7 @@ pub(crate) struct ArrayToken {
slot: *const u8,

/// Stamp to store into the slot after reading or writing.
stamp: usize,
stamp: u64,
}

impl Default for ArrayToken {
Expand All @@ -55,20 +56,20 @@ pub(crate) struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// packed into a single `u64`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit in the head is always zero.
///
/// Messages are popped from the head of the channel.
head: CachePadded<AtomicUsize>,
head: CachePadded<AtomicU64>,

/// The tail of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// packed into a single `u64`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit indicates that the channel is disconnected.
///
/// Messages are pushed into the tail of the channel.
tail: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicU64>,

/// The buffer holding slots.
buffer: Box<[Slot<T>]>,
Expand All @@ -77,10 +78,10 @@ pub(crate) struct Channel<T> {
cap: usize,

/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
one_lap: usize,
one_lap: u64,

/// If this bit is set in the tail, that means the channel is disconnected.
mark_bit: usize,
mark_bit: u64,

/// Senders waiting while the channel is full.
senders: SyncWaker,
Expand All @@ -95,7 +96,7 @@ impl<T> Channel<T> {
assert!(cap > 0, "capacity must be positive");

// Compute constants `mark_bit` and `one_lap`.
let mark_bit = (cap + 1).next_power_of_two();
let mark_bit = (cap as u64 + 1).next_power_of_two();
let one_lap = mark_bit * 2;

// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
Expand All @@ -105,11 +106,11 @@ impl<T> Channel<T> {

// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer: Box<[Slot<T>]> = (0..cap)
let buffer: Box<[Slot<T>]> = (0..cap as u64)
.map(|i| {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
Slot {
stamp: AtomicUsize::new(i),
stamp: AtomicU64::new(i),
msg: UnsafeCell::new(MaybeUninit::uninit()),
}
})
Expand All @@ -120,8 +121,8 @@ impl<T> Channel<T> {
cap,
one_lap,
mark_bit,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
head: CachePadded::new(AtomicU64::new(head)),
tail: CachePadded::new(AtomicU64::new(tail)),
senders: SyncWaker::new(),
receivers: SyncWaker::new(),
}
Expand Down Expand Up @@ -151,7 +152,7 @@ impl<T> Channel<T> {
}

// Deconstruct the tail.
let index = tail & (self.mark_bit - 1);
let index = (tail & (self.mark_bit - 1)) as usize;
let lap = tail & !(self.one_lap - 1);

// Inspect the corresponding slot.
Expand Down Expand Up @@ -234,7 +235,7 @@ impl<T> Channel<T> {

loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let index = (head & (self.mark_bit - 1)) as usize;
let lap = head & !(self.one_lap - 1);

// Inspect the corresponding slot.
Expand Down Expand Up @@ -452,8 +453,8 @@ impl<T> Channel<T> {

// If the tail didn't change, we've got consistent values to work with.
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
let hix = (head & (self.mark_bit - 1)) as usize;
let tix = (tail & (self.mark_bit - 1)) as usize;

return if hix < tix {
tix - hix
Expand Down Expand Up @@ -524,8 +525,8 @@ impl<T> Drop for Channel<T> {
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();

let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
let hix = (head & (self.mark_bit - 1)) as usize;
let tix = (tail & (self.mark_bit - 1)) as usize;

let len = if hix < tix {
tix - hix
Expand Down
31 changes: 16 additions & 15 deletions crossbeam-channel/src/flavors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crossbeam_utils::{Backoff, CachePadded};
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::utils::AtomicU64;
use crate::waker::SyncWaker;

// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
Expand All @@ -29,15 +30,15 @@ const READ: usize = 2;
const DESTROY: usize = 4;

// Each block covers one "lap" of indices.
const LAP: usize = 32;
const LAP: u64 = 32;
// The maximum number of messages a block can hold.
const BLOCK_CAP: usize = LAP - 1;
const BLOCK_CAP: usize = LAP as usize - 1;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
const SHIFT: u64 = 1;
// Has two different purposes:
// * If set in head, indicates that the block is not the last one.
// * If set in tail, indicates that the channel is disconnected.
const MARK_BIT: usize = 1;
const MARK_BIT: u64 = 1;

/// A slot in a block.
struct Slot<T> {
Expand Down Expand Up @@ -66,7 +67,7 @@ struct Block<T> {
next: AtomicPtr<Block<T>>,

/// Slots for messages.
slots: [Slot<T>; BLOCK_CAP],
slots: [Slot<T>; BLOCK_CAP as usize],
}

impl<T> Block<T> {
Expand Down Expand Up @@ -97,7 +98,7 @@ impl<T> Block<T> {
unsafe fn destroy(this: *mut Block<T>, start: usize) {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
for i in start..BLOCK_CAP as usize - 1 {
let slot = (*this).slots.get_unchecked(i);

// Mark the `DESTROY` bit if a thread is still using the slot.
Expand All @@ -118,7 +119,7 @@ impl<T> Block<T> {
#[derive(Debug)]
struct Position<T> {
/// The index in the channel.
index: AtomicUsize,
index: AtomicU64,

/// The block in the linked list.
block: AtomicPtr<Block<T>>,
Expand Down Expand Up @@ -171,11 +172,11 @@ impl<T> Channel<T> {
Channel {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
index: AtomicU64::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
index: AtomicU64::new(0),
}),
receivers: SyncWaker::new(),
_marker: PhantomData,
Expand Down Expand Up @@ -207,7 +208,7 @@ impl<T> Channel<T> {
}

// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;
let offset = ((tail >> SHIFT) % LAP) as usize;

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
Expand Down Expand Up @@ -302,7 +303,7 @@ impl<T> Channel<T> {

loop {
// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;
let offset = ((head >> SHIFT) % LAP) as usize;

// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
Expand Down Expand Up @@ -520,7 +521,7 @@ impl<T> Channel<T> {
head >>= SHIFT;

// Return the difference minus the number of blocks between tail and head.
return tail - head - tail / LAP;
return (tail - head - tail / LAP) as usize;
}
}
}
Expand Down Expand Up @@ -567,7 +568,7 @@ impl<T> Channel<T> {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
loop {
let offset = (tail >> SHIFT) % LAP;
let offset = ((tail >> SHIFT) % LAP) as usize;
if offset != BLOCK_CAP {
break;
}
Expand All @@ -585,7 +586,7 @@ impl<T> Channel<T> {
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
let offset = (head >> SHIFT) % LAP;
let offset = ((head >> SHIFT) % LAP) as usize;

if offset < BLOCK_CAP {
// Drop the message in the slot.
Expand Down Expand Up @@ -645,7 +646,7 @@ impl<T> Drop for Channel<T> {
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
let offset = ((head >> SHIFT) % LAP) as usize;

if offset < BLOCK_CAP {
// Drop the message in the slot.
Expand Down
40 changes: 40 additions & 0 deletions crossbeam-channel/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,46 @@ pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant {
}
}

#[cfg(not(crossbeam_no_atomic_64))]
pub(crate) use core::sync::atomic::AtomicU64;

#[cfg(crossbeam_no_atomic_64)]
#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct AtomicU64 {
inner: crossbeam_utils::atomic::AtomicCell<u64>,
}

#[cfg(crossbeam_no_atomic_64)]
impl AtomicU64 {
pub(crate) const fn new(v: u64) -> Self {
Self {
inner: crossbeam_utils::atomic::AtomicCell::new(v),
}
}
pub(crate) fn load(&self, _order: Ordering) -> u64 {
self.inner.load()
}
Comment on lines +72 to +91
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it preferable to define this in crossbeam_utils::atomic?

pub(crate) fn store(&self, val: u64, _order: Ordering) {
self.inner.store(val);
}
pub(crate) fn compare_exchange_weak(
&self,
current: u64,
new: u64,
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
self.inner.compare_exchange(current, new)
}
pub(crate) fn fetch_add(&self, val: u64, _order: Ordering) -> u64 {
self.inner.fetch_add(val)
}
pub(crate) fn fetch_or(&self, val: u64, _order: Ordering) -> u64 {
self.inner.fetch_or(val)
}
}

/// A simple spinlock.
pub(crate) struct Spinlock<T> {
flag: AtomicBool,
Expand Down
Loading