diff --git a/Cargo.toml b/Cargo.toml index 8a8dc4abf..581fa21dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ default-features = false optional = true [dependencies.crossbeam-utils] -version = "0.8.5" +version = "0.8.6" path = "./crossbeam-utils" default-features = false diff --git a/ci/no_atomic.sh b/ci/no_atomic.sh index 232140f63..2cdb943b9 100755 --- a/ci/no_atomic.sh +++ b/ci/no_atomic.sh @@ -41,6 +41,7 @@ cat >"${file}" <>"${file}" < target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // "max-atomic-width" as 64 when the build script doesn't run. This is + // needed for compatibility with non-cargo build systems that don't run the + // build script. + if NO_ATOMIC_64.contains(&&*target) { + println!("cargo:rustc-cfg=crossbeam_no_atomic_64"); + } else { + // Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`. + } + + println!("cargo:rerun-if-changed=no_atomic.rs"); +} diff --git a/crossbeam-channel/no_atomic.rs b/crossbeam-channel/no_atomic.rs new file mode 120000 index 000000000..417886bb7 --- /dev/null +++ b/crossbeam-channel/no_atomic.rs @@ -0,0 +1 @@ +../no_atomic.rs \ No newline at end of file diff --git a/crossbeam-channel/src/flavors/array.rs b/crossbeam-channel/src/flavors/array.rs index 73557d385..9a1b51d51 100644 --- a/crossbeam-channel/src/flavors/array.rs +++ b/crossbeam-channel/src/flavors/array.rs @@ -11,7 +11,7 @@ use std::cell::UnsafeCell; use std::mem::MaybeUninit; use std::ptr; -use std::sync::atomic::{self, AtomicUsize, Ordering}; +use std::sync::atomic::{self, Ordering}; use std::time::Instant; use crossbeam_utils::{Backoff, CachePadded}; @@ -19,12 +19,13 @@ use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; +use crate::utils::AtomicU64; use crate::waker::SyncWaker; /// A slot in a channel. struct Slot { /// The current stamp. - stamp: AtomicUsize, + stamp: AtomicU64, /// The message in this slot. msg: UnsafeCell>, @@ -37,7 +38,7 @@ pub(crate) struct ArrayToken { slot: *const u8, /// Stamp to store into the slot after reading or writing. - stamp: usize, + stamp: u64, } impl Default for ArrayToken { @@ -55,20 +56,20 @@ pub(crate) struct Channel { /// The head of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but - /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// packed into a single `u64`. The lower bits represent the index, while the upper bits /// represent the lap. The mark bit in the head is always zero. /// /// Messages are popped from the head of the channel. - head: CachePadded, + head: CachePadded, /// The tail of the channel. /// /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but - /// packed into a single `usize`. The lower bits represent the index, while the upper bits + /// packed into a single `u64`. The lower bits represent the index, while the upper bits /// represent the lap. The mark bit indicates that the channel is disconnected. /// /// Messages are pushed into the tail of the channel. - tail: CachePadded, + tail: CachePadded, /// The buffer holding slots. buffer: Box<[Slot]>, @@ -77,10 +78,10 @@ pub(crate) struct Channel { cap: usize, /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`. - one_lap: usize, + one_lap: u64, /// If this bit is set in the tail, that means the channel is disconnected. - mark_bit: usize, + mark_bit: u64, /// Senders waiting while the channel is full. senders: SyncWaker, @@ -95,7 +96,7 @@ impl Channel { assert!(cap > 0, "capacity must be positive"); // Compute constants `mark_bit` and `one_lap`. - let mark_bit = (cap + 1).next_power_of_two(); + let mark_bit = (cap as u64 + 1).next_power_of_two(); let one_lap = mark_bit * 2; // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`. @@ -105,11 +106,11 @@ impl Channel { // Allocate a buffer of `cap` slots initialized // with stamps. - let buffer: Box<[Slot]> = (0..cap) + let buffer: Box<[Slot]> = (0..cap as u64) .map(|i| { // Set the stamp to `{ lap: 0, mark: 0, index: i }`. Slot { - stamp: AtomicUsize::new(i), + stamp: AtomicU64::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()), } }) @@ -120,8 +121,8 @@ impl Channel { cap, one_lap, mark_bit, - head: CachePadded::new(AtomicUsize::new(head)), - tail: CachePadded::new(AtomicUsize::new(tail)), + head: CachePadded::new(AtomicU64::new(head)), + tail: CachePadded::new(AtomicU64::new(tail)), senders: SyncWaker::new(), receivers: SyncWaker::new(), } @@ -151,7 +152,7 @@ impl Channel { } // Deconstruct the tail. - let index = tail & (self.mark_bit - 1); + let index = (tail & (self.mark_bit - 1)) as usize; let lap = tail & !(self.one_lap - 1); // Inspect the corresponding slot. @@ -234,7 +235,7 @@ impl Channel { loop { // Deconstruct the head. - let index = head & (self.mark_bit - 1); + let index = (head & (self.mark_bit - 1)) as usize; let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. @@ -452,8 +453,8 @@ impl Channel { // If the tail didn't change, we've got consistent values to work with. if self.tail.load(Ordering::SeqCst) == tail { - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); + let hix = (head & (self.mark_bit - 1)) as usize; + let tix = (tail & (self.mark_bit - 1)) as usize; return if hix < tix { tix - hix @@ -524,8 +525,8 @@ impl Drop for Channel { let head = *self.head.get_mut(); let tail = *self.tail.get_mut(); - let hix = head & (self.mark_bit - 1); - let tix = tail & (self.mark_bit - 1); + let hix = (head & (self.mark_bit - 1)) as usize; + let tix = (tail & (self.mark_bit - 1)) as usize; let len = if hix < tix { tix - hix diff --git a/crossbeam-channel/src/flavors/list.rs b/crossbeam-channel/src/flavors/list.rs index 9bda6d1cc..795a86ba6 100644 --- a/crossbeam-channel/src/flavors/list.rs +++ b/crossbeam-channel/src/flavors/list.rs @@ -12,6 +12,7 @@ use crossbeam_utils::{Backoff, CachePadded}; use crate::context::Context; use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError}; use crate::select::{Operation, SelectHandle, Selected, Token}; +use crate::utils::AtomicU64; use crate::waker::SyncWaker; // TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the @@ -29,15 +30,15 @@ const READ: usize = 2; const DESTROY: usize = 4; // Each block covers one "lap" of indices. -const LAP: usize = 32; +const LAP: u64 = 32; // The maximum number of messages a block can hold. -const BLOCK_CAP: usize = LAP - 1; +const BLOCK_CAP: usize = LAP as usize - 1; // How many lower bits are reserved for metadata. -const SHIFT: usize = 1; +const SHIFT: u64 = 1; // Has two different purposes: // * If set in head, indicates that the block is not the last one. // * If set in tail, indicates that the channel is disconnected. -const MARK_BIT: usize = 1; +const MARK_BIT: u64 = 1; /// A slot in a block. struct Slot { @@ -66,7 +67,7 @@ struct Block { next: AtomicPtr>, /// Slots for messages. - slots: [Slot; BLOCK_CAP], + slots: [Slot; BLOCK_CAP as usize], } impl Block { @@ -97,7 +98,7 @@ impl Block { unsafe fn destroy(this: *mut Block, start: usize) { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. - for i in start..BLOCK_CAP - 1 { + for i in start..BLOCK_CAP as usize - 1 { let slot = (*this).slots.get_unchecked(i); // Mark the `DESTROY` bit if a thread is still using the slot. @@ -118,7 +119,7 @@ impl Block { #[derive(Debug)] struct Position { /// The index in the channel. - index: AtomicUsize, + index: AtomicU64, /// The block in the linked list. block: AtomicPtr>, @@ -171,11 +172,11 @@ impl Channel { Channel { head: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), - index: AtomicUsize::new(0), + index: AtomicU64::new(0), }), tail: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), - index: AtomicUsize::new(0), + index: AtomicU64::new(0), }), receivers: SyncWaker::new(), _marker: PhantomData, @@ -207,7 +208,7 @@ impl Channel { } // Calculate the offset of the index into the block. - let offset = (tail >> SHIFT) % LAP; + let offset = ((tail >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -302,7 +303,7 @@ impl Channel { loop { // Calculate the offset of the index into the block. - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -520,7 +521,7 @@ impl Channel { head >>= SHIFT; // Return the difference minus the number of blocks between tail and head. - return tail - head - tail / LAP; + return (tail - head - tail / LAP) as usize; } } } @@ -567,7 +568,7 @@ impl Channel { let backoff = Backoff::new(); let mut tail = self.tail.index.load(Ordering::Acquire); loop { - let offset = (tail >> SHIFT) % LAP; + let offset = ((tail >> SHIFT) % LAP) as usize; if offset != BLOCK_CAP { break; } @@ -585,7 +586,7 @@ impl Channel { unsafe { // Drop all messages between head and tail and deallocate the heap-allocated blocks. while head >> SHIFT != tail >> SHIFT { - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; if offset < BLOCK_CAP { // Drop the message in the slot. @@ -645,7 +646,7 @@ impl Drop for Channel { unsafe { // Drop all messages between head and tail and deallocate the heap-allocated blocks. while head != tail { - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; if offset < BLOCK_CAP { // Drop the message in the slot. diff --git a/crossbeam-channel/src/utils.rs b/crossbeam-channel/src/utils.rs index d87c2408c..c9cc8a928 100644 --- a/crossbeam-channel/src/utils.rs +++ b/crossbeam-channel/src/utils.rs @@ -69,6 +69,46 @@ pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant { } } +#[cfg(not(crossbeam_no_atomic_64))] +pub(crate) use core::sync::atomic::AtomicU64; + +#[cfg(crossbeam_no_atomic_64)] +#[derive(Debug)] +#[repr(transparent)] +pub(crate) struct AtomicU64 { + inner: crossbeam_utils::atomic::AtomicCell, +} + +#[cfg(crossbeam_no_atomic_64)] +impl AtomicU64 { + pub(crate) const fn new(v: u64) -> Self { + Self { + inner: crossbeam_utils::atomic::AtomicCell::new(v), + } + } + pub(crate) fn load(&self, _order: Ordering) -> u64 { + self.inner.load() + } + pub(crate) fn store(&self, val: u64, _order: Ordering) { + self.inner.store(val); + } + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result { + self.inner.compare_exchange(current, new) + } + pub(crate) fn fetch_add(&self, val: u64, _order: Ordering) -> u64 { + self.inner.fetch_add(val) + } + pub(crate) fn fetch_or(&self, val: u64, _order: Ordering) -> u64 { + self.inner.fetch_or(val) + } +} + /// A simple spinlock. pub(crate) struct Spinlock { flag: AtomicBool, diff --git a/crossbeam-deque/Cargo.toml b/crossbeam-deque/Cargo.toml index f303a2f15..3074d1bc5 100644 --- a/crossbeam-deque/Cargo.toml +++ b/crossbeam-deque/Cargo.toml @@ -33,7 +33,7 @@ default-features = false optional = true [dependencies.crossbeam-utils] -version = "0.8" +version = "0.8.6" path = "../crossbeam-utils" default-features = false optional = true diff --git a/crossbeam-deque/build.rs b/crossbeam-deque/build.rs new file mode 100644 index 000000000..40e995c51 --- /dev/null +++ b/crossbeam-deque/build.rs @@ -0,0 +1,43 @@ +// The rustc-cfg listed below are considered public API, but it is *unstable* +// and outside of the normal semver guarantees: +// +// - `crossbeam_no_atomic_64` +// Assume the target does *not* support AtomicU64/AtomicI64. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + +#![warn(rust_2018_idioms)] + +use std::env; + +include!("no_atomic.rs"); + +fn main() { + let target = match env::var("TARGET") { + Ok(target) => target, + Err(e) => { + println!( + "cargo:warning={}: unable to get TARGET environment variable: {}", + env!("CARGO_PKG_NAME"), + e + ); + return; + } + }; + + // Note that this is `no_*`, not `has_*`. This allows treating + // "max-atomic-width" as 64 when the build script doesn't run. This is + // needed for compatibility with non-cargo build systems that don't run the + // build script. + if NO_ATOMIC_64.contains(&&*target) { + println!("cargo:rustc-cfg=crossbeam_no_atomic_64"); + } else { + // Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`. + } + + println!("cargo:rerun-if-changed=no_atomic.rs"); +} diff --git a/crossbeam-deque/no_atomic.rs b/crossbeam-deque/no_atomic.rs new file mode 120000 index 000000000..417886bb7 --- /dev/null +++ b/crossbeam-deque/no_atomic.rs @@ -0,0 +1 @@ +../no_atomic.rs \ No newline at end of file diff --git a/crossbeam-deque/src/deque.rs b/crossbeam-deque/src/deque.rs index 802a2fef5..4919823a4 100644 --- a/crossbeam-deque/src/deque.rs +++ b/crossbeam-deque/src/deque.rs @@ -8,8 +8,10 @@ use std::ptr; use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use std::sync::Arc; +use crossbeam_utils::{Backoff, CachePadded}; + use crate::epoch::{self, Atomic, Owned}; -use crate::utils::{Backoff, CachePadded}; +use crate::utils::AtomicU64; // Minimum buffer capacity. const MIN_CAP: usize = 64; @@ -1105,13 +1107,13 @@ const READ: usize = 2; const DESTROY: usize = 4; // Each block covers one "lap" of indices. -const LAP: usize = 64; +const LAP: u64 = 64; // The maximum number of values a block can hold. -const BLOCK_CAP: usize = LAP - 1; +const BLOCK_CAP: usize = LAP as usize - 1; // How many lower bits are reserved for metadata. -const SHIFT: usize = 1; +const SHIFT: u64 = 1; // Indicates that the block is not the last one. -const HAS_NEXT: usize = 1; +const HAS_NEXT: u64 = 1; /// A slot in a block. struct Slot { @@ -1191,7 +1193,7 @@ impl Block { /// A position in a queue. struct Position { /// The index in the queue. - index: AtomicUsize, + index: AtomicU64, /// The block in the linked list. block: AtomicPtr>, @@ -1235,11 +1237,11 @@ impl Default for Injector { Self { head: CachePadded::new(Position { block: AtomicPtr::new(block), - index: AtomicUsize::new(0), + index: AtomicU64::new(0), }), tail: CachePadded::new(Position { block: AtomicPtr::new(block), - index: AtomicUsize::new(0), + index: AtomicU64::new(0), }), _marker: PhantomData, } @@ -1279,7 +1281,7 @@ impl Injector { loop { // Calculate the offset of the index into the block. - let offset = (tail >> SHIFT) % LAP; + let offset = ((tail >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -1357,7 +1359,7 @@ impl Injector { block = self.head.block.load(Ordering::Acquire); // Calculate the offset of the index into the block. - offset = (head >> SHIFT) % LAP; + offset = ((head >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -1456,7 +1458,7 @@ impl Injector { block = self.head.block.load(Ordering::Acquire); // Calculate the offset of the index into the block. - offset = (head >> SHIFT) % LAP; + offset = ((head >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -1485,7 +1487,7 @@ impl Injector { // We can steal all tasks till the end of the block. advance = (BLOCK_CAP - offset).min(MAX_BATCH); } else { - let len = (tail - head) >> SHIFT; + let len = ((tail - head) >> SHIFT) as usize; // Steal half of the available tasks. advance = ((len + 1) / 2).min(MAX_BATCH); } @@ -1494,7 +1496,7 @@ impl Injector { advance = (BLOCK_CAP - offset).min(MAX_BATCH); } - new_head += advance << SHIFT; + new_head += (advance as u64) << SHIFT; let new_offset = offset + advance; // Try moving the head index forward. @@ -1615,7 +1617,7 @@ impl Injector { block = self.head.block.load(Ordering::Acquire); // Calculate the offset of the index into the block. - offset = (head >> SHIFT) % LAP; + offset = ((head >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -1643,7 +1645,7 @@ impl Injector { // We can steal all tasks till the end of the block. advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); } else { - let len = (tail - head) >> SHIFT; + let len = ((tail - head) >> SHIFT) as usize; // Steal half of the available tasks. advance = ((len + 1) / 2).min(MAX_BATCH + 1); } @@ -1652,7 +1654,7 @@ impl Injector { advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); } - new_head += advance << SHIFT; + new_head += (advance as u64) << SHIFT; let new_offset = offset + advance; // Try moving the head index forward. @@ -1812,7 +1814,7 @@ impl Injector { head >>= SHIFT; // Return the difference minus the number of blocks between tail and head. - return tail - head - tail / LAP; + return (tail - head - tail / LAP) as usize; } } } @@ -1831,7 +1833,7 @@ impl Drop for Injector { unsafe { // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. while head != tail { - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; if offset < BLOCK_CAP { // Drop the task in the slot. diff --git a/crossbeam-deque/src/lib.rs b/crossbeam-deque/src/lib.rs index b696b5f96..b7f810a95 100644 --- a/crossbeam-deque/src/lib.rs +++ b/crossbeam-deque/src/lib.rs @@ -103,9 +103,10 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(feature = "std")] { use crossbeam_epoch as epoch; - use crossbeam_utils as utils; mod deque; + mod utils; + pub use crate::deque::{Injector, Steal, Stealer, Worker}; } } diff --git a/crossbeam-deque/src/utils.rs b/crossbeam-deque/src/utils.rs new file mode 100644 index 000000000..ef25ba62c --- /dev/null +++ b/crossbeam-deque/src/utils.rs @@ -0,0 +1,38 @@ +//! Miscellaneous utilities. + +#[cfg(not(crossbeam_no_atomic_64))] +pub(crate) use core::sync::atomic::AtomicU64; + +#[cfg(crossbeam_no_atomic_64)] +use core::sync::atomic::Ordering; + +#[cfg(crossbeam_no_atomic_64)] +#[derive(Debug)] +#[repr(transparent)] +pub(crate) struct AtomicU64 { + inner: crossbeam_utils::atomic::AtomicCell, +} + +#[cfg(crossbeam_no_atomic_64)] +impl AtomicU64 { + pub(crate) const fn new(v: u64) -> Self { + Self { + inner: crossbeam_utils::atomic::AtomicCell::new(v), + } + } + pub(crate) fn load(&self, _order: Ordering) -> u64 { + self.inner.load() + } + pub(crate) fn store(&self, val: u64, _order: Ordering) { + self.inner.store(val); + } + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result { + self.inner.compare_exchange(current, new) + } +} diff --git a/crossbeam-epoch/Cargo.toml b/crossbeam-epoch/Cargo.toml index fe111e720..47a9fa940 100644 --- a/crossbeam-epoch/Cargo.toml +++ b/crossbeam-epoch/Cargo.toml @@ -59,7 +59,7 @@ scopeguard = { version = "1.1", default-features = false } loom-crate = { package = "loom", version = "0.5", optional = true } [dependencies.crossbeam-utils] -version = "0.8.5" +version = "0.8.6" path = "../crossbeam-utils" default-features = false diff --git a/crossbeam-queue/Cargo.toml b/crossbeam-queue/Cargo.toml index 19b1044a9..ca2d62968 100644 --- a/crossbeam-queue/Cargo.toml +++ b/crossbeam-queue/Cargo.toml @@ -40,7 +40,7 @@ nightly = ["crossbeam-utils/nightly"] cfg-if = "1" [dependencies.crossbeam-utils] -version = "0.8.5" +version = "0.8.6" path = "../crossbeam-utils" default-features = false diff --git a/crossbeam-queue/build.rs b/crossbeam-queue/build.rs index 587e0580b..7e8b8d963 100644 --- a/crossbeam-queue/build.rs +++ b/crossbeam-queue/build.rs @@ -7,6 +7,12 @@ // need to enable it manually when building for custom targets or using // non-cargo build systems that don't run the build script. // +// - `crossbeam_no_atomic_64` +// Assume the target does *not* support AtomicU64/AtomicI64. +// This is usually detected automatically by the build script, but you may +// need to enable it manually when building for custom targets or using +// non-cargo build systems that don't run the build script. +// // With the exceptions mentioned above, the rustc-cfg emitted by the build // script are *not* public API. @@ -36,6 +42,11 @@ fn main() { if NO_ATOMIC_CAS.contains(&&*target) { println!("cargo:rustc-cfg=crossbeam_no_atomic_cas"); } + if NO_ATOMIC_64.contains(&&*target) { + println!("cargo:rustc-cfg=crossbeam_no_atomic_64"); + } else { + // Otherwise, assuming `"max-atomic-width" == 64` or `"max-atomic-width" == 128`. + } println!("cargo:rerun-if-changed=no_atomic.rs"); } diff --git a/crossbeam-queue/src/array_queue.rs b/crossbeam-queue/src/array_queue.rs index c34f589e7..eeff19724 100644 --- a/crossbeam-queue/src/array_queue.rs +++ b/crossbeam-queue/src/array_queue.rs @@ -7,17 +7,19 @@ use alloc::boxed::Box; use core::cell::UnsafeCell; use core::fmt; use core::mem::MaybeUninit; -use core::sync::atomic::{self, AtomicUsize, Ordering}; +use core::sync::atomic::{self, Ordering}; use crossbeam_utils::{Backoff, CachePadded}; +use crate::utils::AtomicU64; + /// A slot in a queue. struct Slot { /// The current stamp. /// /// If the stamp equals the tail, this node will be next written to. If it equals head + 1, /// this node will be next read from. - stamp: AtomicUsize, + stamp: AtomicU64, /// The value in this slot. value: UnsafeCell>, @@ -50,18 +52,18 @@ pub struct ArrayQueue { /// The head of the queue. /// /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a - /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. + /// single `u64`. The lower bits represent the index, while the upper bits represent the lap. /// /// Elements are popped from the head of the queue. - head: CachePadded, + head: CachePadded, /// The tail of the queue. /// /// This value is a "stamp" consisting of an index into the buffer and a lap, but packed into a - /// single `usize`. The lower bits represent the index, while the upper bits represent the lap. + /// single `u64`. The lower bits represent the index, while the upper bits represent the lap. /// /// Elements are pushed into the tail of the queue. - tail: CachePadded, + tail: CachePadded, /// The buffer holding slots. buffer: Box<[Slot]>, @@ -70,7 +72,7 @@ pub struct ArrayQueue { cap: usize, /// A stamp with the value of `{ lap: 1, index: 0 }`. - one_lap: usize, + one_lap: u64, } unsafe impl Sync for ArrayQueue {} @@ -100,38 +102,38 @@ impl ArrayQueue { // Allocate a buffer of `cap` slots initialized // with stamps. - let buffer: Box<[Slot]> = (0..cap) + let buffer: Box<[Slot]> = (0..cap as u64) .map(|i| { // Set the stamp to `{ lap: 0, index: i }`. Slot { - stamp: AtomicUsize::new(i), + stamp: AtomicU64::new(i), value: UnsafeCell::new(MaybeUninit::uninit()), } }) .collect(); // One lap is the smallest power of two greater than `cap`. - let one_lap = (cap + 1).next_power_of_two(); + let one_lap = (cap as u64 + 1).next_power_of_two(); ArrayQueue { buffer, cap, one_lap, - head: CachePadded::new(AtomicUsize::new(head)), - tail: CachePadded::new(AtomicUsize::new(tail)), + head: CachePadded::new(AtomicU64::new(head)), + tail: CachePadded::new(AtomicU64::new(tail)), } } fn push_or_else(&self, mut value: T, f: F) -> Result<(), T> where - F: Fn(T, usize, usize, &Slot) -> Result, + F: Fn(T, u64, u64, &Slot) -> Result, { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); loop { // Deconstruct the tail. - let index = tail & (self.one_lap - 1); + let index = (tail & (self.one_lap - 1)) as usize; let lap = tail & !(self.one_lap - 1); let new_tail = if index + 1 < self.cap { @@ -278,7 +280,7 @@ impl ArrayQueue { loop { // Deconstruct the head. - let index = head & (self.one_lap - 1); + let index = (head & (self.one_lap - 1)) as usize; let lap = head & !(self.one_lap - 1); // Inspect the corresponding slot. @@ -424,8 +426,8 @@ impl ArrayQueue { // If the tail didn't change, we've got consistent values to work with. if self.tail.load(Ordering::SeqCst) == tail { - let hix = head & (self.one_lap - 1); - let tix = tail & (self.one_lap - 1); + let hix = (head & (self.one_lap - 1)) as usize; + let tix = (tail & (self.one_lap - 1)) as usize; return if hix < tix { tix - hix @@ -444,7 +446,7 @@ impl ArrayQueue { impl Drop for ArrayQueue { fn drop(&mut self) { // Get the index of the head. - let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1); + let hix = (self.head.load(Ordering::Relaxed) & (self.one_lap - 1)) as usize; // Loop over all slots that hold a message and drop them. for i in 0..self.len() { @@ -491,9 +493,9 @@ impl Iterator for IntoIter { fn next(&mut self) -> Option { let value = &mut self.value; - let head = *value.head.get_mut(); - if value.head.get_mut() != value.tail.get_mut() { - let index = head & (value.one_lap - 1); + let head = value.head.load(Ordering::Relaxed); + if head != value.tail.load(Ordering::Relaxed) { + let index = (head & (value.one_lap - 1)) as usize; let lap = head & !(value.one_lap - 1); // SAFETY: We have mutable access to this, so we can read without // worrying about concurrency. Furthermore, we know this is @@ -513,7 +515,7 @@ impl Iterator for IntoIter { // Set to `{ lap: lap.wrapping_add(1), index: 0 }`. lap.wrapping_add(value.one_lap) }; - *value.head.get_mut() = new; + value.head.store(new, Ordering::Relaxed); Option::Some(val) } else { Option::None diff --git a/crossbeam-queue/src/lib.rs b/crossbeam-queue/src/lib.rs index 846d7c2e1..439a34f2b 100644 --- a/crossbeam-queue/src/lib.rs +++ b/crossbeam-queue/src/lib.rs @@ -27,6 +27,7 @@ cfg_if::cfg_if! { mod array_queue; mod seg_queue; + mod utils; pub use self::array_queue::ArrayQueue; pub use self::seg_queue::SegQueue; diff --git a/crossbeam-queue/src/seg_queue.rs b/crossbeam-queue/src/seg_queue.rs index 1767775d1..ba381c57d 100644 --- a/crossbeam-queue/src/seg_queue.rs +++ b/crossbeam-queue/src/seg_queue.rs @@ -8,6 +8,8 @@ use core::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; use crossbeam_utils::{Backoff, CachePadded}; +use crate::utils::AtomicU64; + // Bits indicating the state of a slot: // * If a value has been written into the slot, `WRITE` is set. // * If a value has been read from the slot, `READ` is set. @@ -17,13 +19,13 @@ const READ: usize = 2; const DESTROY: usize = 4; // Each block covers one "lap" of indices. -const LAP: usize = 32; +const LAP: u64 = 32; // The maximum number of values a block can hold. -const BLOCK_CAP: usize = LAP - 1; +const BLOCK_CAP: usize = LAP as usize - 1; // How many lower bits are reserved for metadata. -const SHIFT: usize = 1; +const SHIFT: u64 = 1; // Indicates that the block is not the last one. -const HAS_NEXT: usize = 1; +const HAS_NEXT: u64 = 1; /// A slot in a block. struct Slot { @@ -52,7 +54,7 @@ struct Block { next: AtomicPtr>, /// Slots for values. - slots: [Slot; BLOCK_CAP], + slots: [Slot; BLOCK_CAP as usize], } impl Block { @@ -83,7 +85,7 @@ impl Block { unsafe fn destroy(this: *mut Block, start: usize) { // It is not necessary to set the `DESTROY` bit in the last slot because that slot has // begun destruction of the block. - for i in start..BLOCK_CAP - 1 { + for i in start..BLOCK_CAP as usize - 1 { let slot = (*this).slots.get_unchecked(i); // Mark the `DESTROY` bit if a thread is still using the slot. @@ -103,7 +105,7 @@ impl Block { /// A position in a queue. struct Position { /// The index in the queue. - index: AtomicUsize, + index: AtomicU64, /// The block in the linked list. block: AtomicPtr>, @@ -160,11 +162,11 @@ impl SegQueue { SegQueue { head: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), - index: AtomicUsize::new(0), + index: AtomicU64::new(0), }), tail: CachePadded::new(Position { block: AtomicPtr::new(ptr::null_mut()), - index: AtomicUsize::new(0), + index: AtomicU64::new(0), }), _marker: PhantomData, } @@ -190,7 +192,7 @@ impl SegQueue { loop { // Calculate the offset of the index into the block. - let offset = (tail >> SHIFT) % LAP; + let offset = ((tail >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -284,7 +286,7 @@ impl SegQueue { loop { // Calculate the offset of the index into the block. - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; // If we reached the end of the block, wait until the next one is installed. if offset == BLOCK_CAP { @@ -429,7 +431,7 @@ impl SegQueue { head >>= SHIFT; // Return the difference minus the number of blocks between tail and head. - return tail - head - tail / LAP; + return (tail - head - tail / LAP) as usize; } } } @@ -448,7 +450,7 @@ impl Drop for SegQueue { unsafe { // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks. while head != tail { - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; if offset < BLOCK_CAP { // Drop the value in the slot. @@ -505,13 +507,13 @@ impl Iterator for IntoIter { fn next(&mut self) -> Option { let value = &mut self.value; - let head = *value.head.index.get_mut(); - let tail = *value.tail.index.get_mut(); + let head = value.head.index.load(Ordering::Relaxed); + let tail = value.tail.index.load(Ordering::Relaxed); if head >> SHIFT == tail >> SHIFT { None } else { let block = *value.head.block.get_mut(); - let offset = (head >> SHIFT) % LAP; + let offset = ((head >> SHIFT) % LAP) as usize; // SAFETY: We have mutable access to this, so we can read without // worrying about concurrency. Furthermore, we know this is @@ -533,11 +535,17 @@ impl Iterator for IntoIter { *value.head.block.get_mut() = next; } // The last value in a block is empty, so skip it - *value.head.index.get_mut() = head.wrapping_add(2 << SHIFT); + value + .head + .index + .store(head.wrapping_add(2 << SHIFT), Ordering::Relaxed); // Double-check that we're pointing to the first item in a block. - debug_assert_eq!((*value.head.index.get_mut() >> SHIFT) % LAP, 0); + debug_assert_eq!((value.head.index.load(Ordering::Relaxed) >> SHIFT) % LAP, 0); } else { - *value.head.index.get_mut() = head.wrapping_add(1 << SHIFT); + value + .head + .index + .store(head.wrapping_add(1 << SHIFT), Ordering::Relaxed); } Some(item) } diff --git a/crossbeam-queue/src/utils.rs b/crossbeam-queue/src/utils.rs new file mode 100644 index 000000000..ef25ba62c --- /dev/null +++ b/crossbeam-queue/src/utils.rs @@ -0,0 +1,38 @@ +//! Miscellaneous utilities. + +#[cfg(not(crossbeam_no_atomic_64))] +pub(crate) use core::sync::atomic::AtomicU64; + +#[cfg(crossbeam_no_atomic_64)] +use core::sync::atomic::Ordering; + +#[cfg(crossbeam_no_atomic_64)] +#[derive(Debug)] +#[repr(transparent)] +pub(crate) struct AtomicU64 { + inner: crossbeam_utils::atomic::AtomicCell, +} + +#[cfg(crossbeam_no_atomic_64)] +impl AtomicU64 { + pub(crate) const fn new(v: u64) -> Self { + Self { + inner: crossbeam_utils::atomic::AtomicCell::new(v), + } + } + pub(crate) fn load(&self, _order: Ordering) -> u64 { + self.inner.load() + } + pub(crate) fn store(&self, val: u64, _order: Ordering) { + self.inner.store(val); + } + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result { + self.inner.compare_exchange(current, new) + } +} diff --git a/crossbeam-skiplist/Cargo.toml b/crossbeam-skiplist/Cargo.toml index 0e0053882..02114655b 100644 --- a/crossbeam-skiplist/Cargo.toml +++ b/crossbeam-skiplist/Cargo.toml @@ -37,7 +37,7 @@ default-features = false optional = true [dependencies.crossbeam-utils] -version = "0.8.5" +version = "0.8.6" path = "../crossbeam-utils" default-features = false diff --git a/no_atomic.rs b/no_atomic.rs index 390019ebd..4797b28f7 100644 --- a/no_atomic.rs +++ b/no_atomic.rs @@ -1,6 +1,7 @@ // This file is @generated by no_atomic.sh. // It is not intended for manual editing. +#[allow(dead_code)] // Only crossbeam-{epoch,queue,skiplist,utils} use this. const NO_ATOMIC_CAS: &[&str] = &[ "avr-unknown-gnu-atmega328", "bpfeb-unknown-none", @@ -13,7 +14,7 @@ const NO_ATOMIC_CAS: &[&str] = &[ "thumbv6m-none-eabi", ]; -#[allow(dead_code)] // Only crossbeam-utils uses this. +#[allow(dead_code)] // Only crossbeam-{channel,deque,queue,utils} use this. const NO_ATOMIC_64: &[&str] = &[ "arm-linux-androideabi", "armebv7r-none-eabi",