Skip to content

Commit

Permalink
Atomically replace hold flag
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Dec 11, 2023
1 parent 12937ee commit 447a156
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 32 deletions.
14 changes: 8 additions & 6 deletions async/src/rb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ impl<S: Storage> Consumer for AsyncRb<S> {
}
impl<S: Storage> RingBuffer for AsyncRb<S> {
#[inline]
unsafe fn hold_read(&self, flag: bool) {
self.base.hold_read(flag);
self.read.wake()
unsafe fn hold_read(&self, flag: bool) -> bool {
let old = self.base.hold_read(flag);
self.read.wake();
old
}
#[inline]
unsafe fn hold_write(&self, flag: bool) {
self.base.hold_write(flag);
self.write.wake()
unsafe fn hold_write(&self, flag: bool) -> bool {
let old = self.base.hold_write(flag);
self.write.wake();
old
}
}

Expand Down
10 changes: 6 additions & 4 deletions blocking/src/rb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ impl<S: Storage, X: Semaphore> Consumer for BlockingRb<S, X> {
}
}
impl<S: Storage, X: Semaphore> RingBuffer for BlockingRb<S, X> {
unsafe fn hold_read(&self, flag: bool) {
self.base.hold_read(flag);
unsafe fn hold_read(&self, flag: bool) -> bool {
let old = self.base.hold_read(flag);
self.read.give();
old
}
unsafe fn hold_write(&self, flag: bool) {
self.base.hold_write(flag);
unsafe fn hold_write(&self, flag: bool) -> bool {
let old = self.base.hold_write(flag);
self.write.give();
old
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/rb/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ impl<S: Storage> Consumer for LocalRb<S> {

impl<S: Storage> RingBuffer for LocalRb<S> {
#[inline]
unsafe fn hold_read(&self, flag: bool) {
self.read.held.set(flag)
unsafe fn hold_read(&self, flag: bool) -> bool {
self.read.held.replace(flag)
}
#[inline]
unsafe fn hold_write(&self, flag: bool) {
self.write.held.set(flag)
unsafe fn hold_write(&self, flag: bool) -> bool {
self.write.held.replace(flag)
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/rb/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,12 @@ impl<S: Storage> Consumer for SharedRb<S> {

impl<S: Storage> RingBuffer for SharedRb<S> {
#[inline]
unsafe fn hold_read(&self, flag: bool) {
self.read_held.store(flag, Ordering::Relaxed)
unsafe fn hold_read(&self, flag: bool) -> bool {
self.read_held.swap(flag, Ordering::Relaxed)
}
#[inline]
unsafe fn hold_write(&self, flag: bool) {
self.write_held.store(flag, Ordering::Relaxed)
unsafe fn hold_write(&self, flag: bool) -> bool {
self.write_held.swap(flag, Ordering::Relaxed)
}
}

Expand Down
16 changes: 10 additions & 6 deletions src/traits/ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ use super::{
pub trait RingBuffer: Observer + Consumer + Producer {
/// Tell whether read end of the ring buffer is held by consumer or not.
///
/// Returns old value.
///
/// # Safety
///
/// Must not be set to `false` while consumer exists.
unsafe fn hold_read(&self, flag: bool);
unsafe fn hold_read(&self, flag: bool) -> bool;
/// Tell whether write end of the ring buffer is held by producer or not.
///
/// Returns old value.
///
/// # Safety
///
/// Must not be set to `false` while producer exists.
unsafe fn hold_write(&self, flag: bool);
unsafe fn hold_write(&self, flag: bool) -> bool;

/// Pushes an item to the ring buffer overwriting the latest item if the buffer is full.
///
Expand Down Expand Up @@ -78,11 +82,11 @@ impl<D: DelegateRingBuffer> RingBuffer for D
where
D::Base: RingBuffer,
{
unsafe fn hold_read(&self, flag: bool) {
self.base().hold_read(flag);
unsafe fn hold_read(&self, flag: bool) -> bool {
self.base().hold_read(flag)
}
unsafe fn hold_write(&self, flag: bool) {
self.base().hold_write(flag);
unsafe fn hold_write(&self, flag: bool) -> bool {
self.base().hold_write(flag)
}

#[inline]
Expand Down
6 changes: 2 additions & 4 deletions src/wrap/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ impl<R: RbRef, const P: bool, const C: bool> Direct<R, P, C> {
/// There must be no more than one wrapper with the same parameter being `true`.
pub fn new(rb: R) -> Self {
if P {
assert!(!rb.rb().write_is_held());
unsafe { rb.rb().hold_write(true) };
assert!(!unsafe { rb.rb().hold_write(true) });
}
if C {
assert!(!rb.rb().read_is_held());
unsafe { rb.rb().hold_read(true) };
assert!(!unsafe { rb.rb().hold_read(true) });
}
Self { rb }
}
Expand Down
6 changes: 2 additions & 4 deletions src/wrap/frozen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ impl<R: RbRef, const P: bool, const C: bool> Frozen<R, P, C> {
/// Create new ring buffer cache.
pub fn new(rb: R) -> Self {
if P {
assert!(!rb.rb().write_is_held());
unsafe { rb.rb().hold_write(true) };
assert!(!unsafe { rb.rb().hold_write(true) });
}
if C {
assert!(!rb.rb().read_is_held());
unsafe { rb.rb().hold_read(true) };
assert!(!unsafe { rb.rb().hold_read(true) });
}
unsafe { Self::new_unchecked(rb) }
}
Expand Down

0 comments on commit 447a156

Please sign in to comment.