diff --git a/tokio/src/io/split.rs b/tokio/src/io/split.rs index fd3273ee28a..732eb3b3aa0 100644 --- a/tokio/src/io/split.rs +++ b/tokio/src/io/split.rs @@ -131,7 +131,11 @@ impl AsyncWrite for WriteHalf { impl Inner { fn poll_lock(&self, cx: &mut Context<'_>) -> Poll> { - if !self.locked.compare_and_swap(false, true, Acquire) { + if self + .locked + .compare_exchange(false, true, Acquire, Acquire) + .is_ok() + { Poll::Ready(Guard { inner: self }) } else { // Spin... but investigate a better strategy diff --git a/tokio/src/loom/std/atomic_u64.rs b/tokio/src/loom/std/atomic_u64.rs index 206954fcc38..a86a195b1d2 100644 --- a/tokio/src/loom/std/atomic_u64.rs +++ b/tokio/src/loom/std/atomic_u64.rs @@ -24,8 +24,8 @@ mod imp { } impl AtomicU64 { - pub(crate) fn new(val: u64) -> AtomicU64 { - AtomicU64 { + pub(crate) fn new(val: u64) -> Self { + Self { inner: Mutex::new(val), } } @@ -45,16 +45,31 @@ mod imp { prev } - pub(crate) fn compare_and_swap(&self, old: u64, new: u64, _: Ordering) -> u64 { + pub(crate) fn compare_exchange( + &self, + current: u64, + new: u64, + _success: Ordering, + _failure: Ordering, + ) -> Result { let mut lock = self.inner.lock().unwrap(); - let prev = *lock; - if prev != old { - return prev; + if *lock == current { + *lock = new; + Ok(current) + } else { + Err(*lock) } + } - *lock = new; - prev + pub(crate) fn compare_exchange_weak( + &self, + current: u64, + new: u64, + success: Ordering, + failure: Ordering, + ) -> Result { + self.compare_exchange(current, new, success, failure) } } } diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index cdf4009c08b..0fcaad8d7d2 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -8,7 +8,7 @@ use crate::runtime::task; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; /// Producer handle. May only be used from a single thread. pub(super) struct Local { @@ -194,13 +194,17 @@ impl Local { // work. This is because all tasks are pushed into the queue from the // current thread (or memory has been acquired if the local queue handle // moved). - let actual = self.inner.head.compare_and_swap( - prev, - pack(head.wrapping_add(n), head.wrapping_add(n)), - Release, - ); - - if actual != prev { + if self + .inner + .head + .compare_exchange( + prev, + pack(head.wrapping_add(n), head.wrapping_add(n)), + Release, + Relaxed, + ) + .is_err() + { // We failed to claim the tasks, losing the race. Return out of // this function and try the full `push` routine again. The queue // may not be full anymore. diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index e062f2b7303..6bef7946e5c 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -258,13 +258,15 @@ impl Block { pub(crate) unsafe fn try_push( &self, block: &mut NonNull>, - ordering: Ordering, + success: Ordering, + failure: Ordering, ) -> Result<(), NonNull>> { block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP); let next_ptr = self .next - .compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering); + .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure) + .unwrap_or_else(|x| x); match NonNull::new(next_ptr) { Some(next_ptr) => Err(next_ptr), @@ -306,11 +308,11 @@ impl Block { // // `Release` ensures that the newly allocated block is available to // other threads acquiring the next pointer. - let next = NonNull::new(self.next.compare_and_swap( - ptr::null_mut(), - new_block.as_ptr(), - AcqRel, - )); + let next = NonNull::new( + self.next + .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire) + .unwrap_or_else(|x| x), + ); let next = match next { Some(next) => next, @@ -333,7 +335,7 @@ impl Block { // TODO: Should this iteration be capped? loop { - let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) }; + let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) }; curr = match actual { Ok(_) => { diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 2f4c532a7d7..5dad2babfaf 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -140,11 +140,11 @@ impl Tx { // // Acquire is not needed as any "actual" value is not accessed. // At this point, the linked list is walked to acquire blocks. - let actual = - self.block_tail - .compare_and_swap(block_ptr, next_block.as_ptr(), Release); - - if actual == block_ptr { + if self + .block_tail + .compare_exchange(block_ptr, next_block.as_ptr(), Release, Relaxed) + .is_ok() + { // Synchronize with any senders let tail_position = self.tail_position.fetch_add(0, Release); @@ -191,7 +191,7 @@ impl Tx { // TODO: Unify this logic with Block::grow for _ in 0..3 { - match curr.as_ref().try_push(&mut block, AcqRel) { + match curr.as_ref().try_push(&mut block, AcqRel, Acquire) { Ok(_) => { reused = true; break; diff --git a/tokio/src/sync/task/atomic_waker.rs b/tokio/src/sync/task/atomic_waker.rs index ae4cac7c247..06d70cf8ecc 100644 --- a/tokio/src/sync/task/atomic_waker.rs +++ b/tokio/src/sync/task/atomic_waker.rs @@ -171,7 +171,11 @@ impl AtomicWaker { where W: WakerRef, { - match self.state.compare_and_swap(WAITING, REGISTERING, Acquire) { + match self + .state + .compare_exchange(WAITING, REGISTERING, Acquire, Acquire) + .unwrap_or_else(|x| x) + { WAITING => { unsafe { // Locked acquired, update the waker cell diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index bcad988ef16..11366d2ada3 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -53,6 +53,7 @@ //! refuse to mark the timer as pending. use crate::loom::cell::UnsafeCell; +use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::Ordering; use crate::sync::AtomicWaker; @@ -71,79 +72,6 @@ const STATE_DEREGISTERED: u64 = u64::max_value(); const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; -/// Not all platforms support 64-bit compare-and-swap. This hack replaces the -/// AtomicU64 with a mutex around a u64 on platforms that don't. This is slow, -/// unfortunately, but 32-bit platforms are a bit niche so it'll do for now. -/// -/// Note: We use "x86 or 64-bit pointers" as the condition here because -/// target_has_atomic is not stable. -#[cfg(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -))] -type AtomicU64 = crate::loom::sync::atomic::AtomicU64; - -#[cfg(not(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -)))] -#[derive(Debug)] -struct AtomicU64 { - inner: crate::loom::sync::Mutex, -} - -#[cfg(not(all( - not(tokio_force_time_entry_locked), - any(target_arch = "x86", target_pointer_width = "64") -)))] -impl AtomicU64 { - fn new(v: u64) -> Self { - Self { - inner: crate::loom::sync::Mutex::new(v), - } - } - - fn load(&self, _order: Ordering) -> u64 { - debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock - *self.inner.lock() - } - - fn store(&self, v: u64, _order: Ordering) { - debug_assert_ne!(_order, Ordering::SeqCst); // we only provide AcqRel with the lock - *self.inner.lock() = v; - } - - fn compare_exchange( - &self, - current: u64, - new: u64, - _success: Ordering, - _failure: Ordering, - ) -> Result { - debug_assert_ne!(_success, Ordering::SeqCst); // we only provide AcqRel with the lock - debug_assert_ne!(_failure, Ordering::SeqCst); - - let mut lock = self.inner.lock(); - - if *lock == current { - *lock = new; - Ok(current) - } else { - Err(*lock) - } - } - - fn compare_exchange_weak( - &self, - current: u64, - new: u64, - success: Ordering, - failure: Ordering, - ) -> Result { - self.compare_exchange(current, new, success, failure) - } -} - /// This structure holds the current shared state of the timer - its scheduled /// time (if registered), or otherwise the result of the timer completing, as /// well as the registered waker. @@ -300,7 +228,7 @@ impl StateCell { /// expiration time. /// /// While this function is memory-safe, it should only be called from a - /// context holding both `&mut TimerEntry` and the driver lock. + /// context holding both `&mut TimerEntry` and the driver lock. fn set_expiration(&self, timestamp: u64) { debug_assert!(timestamp < STATE_MIN_VALUE);