Skip to content

Commit

Permalink
Support unsized storage
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Jan 20, 2024
1 parent e972ab0 commit bba11ad
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 54 deletions.
60 changes: 43 additions & 17 deletions src/rb/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
wrap::{Cons, Prod},
};
#[cfg(feature = "alloc")]
use alloc::rc::Rc;
use alloc::{boxed::Box, rc::Rc};
use core::{
cell::Cell,
mem::{ManuallyDrop, MaybeUninit},
Expand All @@ -36,10 +36,10 @@ impl End {
/// Ring buffer for single-threaded use only.
///
/// Slightly faster than multi-threaded version because it doesn't synchronize cache.
pub struct LocalRb<S: Storage> {
storage: Shared<S>,
pub struct LocalRb<S: Storage + ?Sized> {
read: End,
write: End,
storage: Shared<S>,
}

impl<S: Storage> LocalRb<S> {
Expand All @@ -49,25 +49,31 @@ impl<S: Storage> LocalRb<S> {
///
/// The items in storage inside `read..write` range must be initialized, items outside this range must be uninitialized.
/// `read` and `write` positions must be valid (see implementation details).
pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self
where
S::Internal: Sized,
{
Self {
storage: Shared::new(storage),
read: End::new(read),
write: End::new(write),
storage: Shared::new(storage),
}
}
/// Destructures ring buffer into underlying storage and `read` and `write` indices.
///
/// # Safety
///
/// Initialized contents of the storage must be properly dropped.
pub unsafe fn into_raw_parts(self) -> (S, usize, usize) {
pub unsafe fn into_raw_parts(self) -> (S, usize, usize)
where
S::Internal: Sized,
{
let this = ManuallyDrop::new(self);
(ptr::read(&this.storage).into_inner(), this.read_index(), this.write_index())
}
}

impl<S: Storage> Observer for LocalRb<S> {
impl<S: Storage + ?Sized> Observer for LocalRb<S> {
type Item = S::Item;

#[inline]
Expand Down Expand Up @@ -99,21 +105,21 @@ impl<S: Storage> Observer for LocalRb<S> {
}
}

impl<S: Storage> Producer for LocalRb<S> {
impl<S: Storage + ?Sized> Producer for LocalRb<S> {
#[inline]
unsafe fn set_write_index(&self, value: usize) {
self.write.index.set(value);
}
}

impl<S: Storage> Consumer for LocalRb<S> {
impl<S: Storage + ?Sized> Consumer for LocalRb<S> {
#[inline]
unsafe fn set_read_index(&self, value: usize) {
self.read.index.set(value);
}
}

impl<S: Storage> RingBuffer for LocalRb<S> {
impl<S: Storage + ?Sized> RingBuffer for LocalRb<S> {
#[inline]
unsafe fn hold_read(&self, flag: bool) -> bool {
self.read.held.replace(flag)
Expand All @@ -124,23 +130,43 @@ impl<S: Storage> RingBuffer for LocalRb<S> {
}
}

impl<S: Storage> Drop for LocalRb<S> {
impl<S: Storage + ?Sized> Drop for LocalRb<S> {
fn drop(&mut self) {
self.clear();
}
}

#[cfg(feature = "alloc")]
impl<S: Storage> Split for LocalRb<S> {
impl<S: Storage> Split for LocalRb<S>
where
S::Internal: Sized,
{
type Prod = Prod<Rc<Self>>;
type Cons = Cons<Rc<Self>>;

fn split(self) -> (Self::Prod, Self::Cons) {
let rc = Rc::new(self);
(Prod::new(rc.clone()), Cons::new(rc))
Rc::new(self).split()
}
}
#[cfg(feature = "alloc")]
impl<S: Storage + ?Sized> Split for Rc<LocalRb<S>> {
type Prod = Prod<Self>;
type Cons = Cons<Self>;

fn split(self) -> (Self::Prod, Self::Cons) {
(Prod::new(self.clone()), Cons::new(self))
}
}
#[cfg(feature = "alloc")]
impl<S: Storage + ?Sized> Split for Box<LocalRb<S>> {
type Prod = Prod<Rc<LocalRb<S>>>;
type Cons = Cons<Rc<LocalRb<S>>>;

fn split(self) -> (Self::Prod, Self::Cons) {
Rc::<LocalRb<S>>::from(self).split()
}
}
impl<S: Storage> SplitRef for LocalRb<S> {
impl<S: Storage + ?Sized> SplitRef for LocalRb<S> {
type RefProd<'a> = Prod<&'a Self> where Self: 'a;
type RefCons<'a> = Cons<&'a Self> where Self: 'a;

Expand All @@ -154,12 +180,12 @@ rb_impl_init!(LocalRb);
impl_producer_traits!(LocalRb<S: Storage>);
impl_consumer_traits!(LocalRb<S: Storage>);

impl<S: Storage> AsRef<Self> for LocalRb<S> {
impl<S: Storage + ?Sized> AsRef<Self> for LocalRb<S> {
fn as_ref(&self) -> &Self {
self
}
}
impl<S: Storage> AsMut<Self> for LocalRb<S> {
impl<S: Storage + ?Sized> AsMut<Self> for LocalRb<S> {
fn as_mut(&mut self) -> &mut Self {
self
}
Expand Down
58 changes: 42 additions & 16 deletions src/rb/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
wrap::{CachingCons, CachingProd},
};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use alloc::{boxed::Box, sync::Arc};
use core::{
mem::{ManuallyDrop, MaybeUninit},
num::NonZeroUsize,
Expand Down Expand Up @@ -44,12 +44,12 @@ thread::spawn(move || {
```
"##
)]
pub struct SharedRb<S: Storage> {
storage: Shared<S>,
pub struct SharedRb<S: Storage + ?Sized> {
read_index: CachePadded<AtomicUsize>,
write_index: CachePadded<AtomicUsize>,
read_held: AtomicBool,
write_held: AtomicBool,
storage: Shared<S>,
}

impl<S: Storage> SharedRb<S> {
Expand All @@ -59,7 +59,10 @@ impl<S: Storage> SharedRb<S> {
///
/// The items in storage inside `read..write` range must be initialized, items outside this range must be uninitialized.
/// `read` and `write` positions must be valid (see implementation details).
pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
pub unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self
where
S::Internal: Sized,
{
Self {
storage: Shared::new(storage),
read_index: CachePadded::new(AtomicUsize::new(read)),
Expand All @@ -73,13 +76,16 @@ impl<S: Storage> SharedRb<S> {
/// # Safety
///
/// Initialized contents of the storage must be properly dropped.
pub unsafe fn into_raw_parts(self) -> (S, usize, usize) {
pub unsafe fn into_raw_parts(self) -> (S, usize, usize)
where
S::Internal: Sized,
{
let this = ManuallyDrop::new(self);
(ptr::read(&this.storage).into_inner(), this.read_index(), this.write_index())
}
}

impl<S: Storage> Observer for SharedRb<S> {
impl<S: Storage + ?Sized> Observer for SharedRb<S> {
type Item = S::Item;

#[inline]
Expand Down Expand Up @@ -111,21 +117,21 @@ impl<S: Storage> Observer for SharedRb<S> {
}
}

impl<S: Storage> Producer for SharedRb<S> {
impl<S: Storage + ?Sized> Producer for SharedRb<S> {
#[inline]
unsafe fn set_write_index(&self, value: usize) {
self.write_index.store(value, Ordering::Release);
}
}

impl<S: Storage> Consumer for SharedRb<S> {
impl<S: Storage + ?Sized> Consumer for SharedRb<S> {
#[inline]
unsafe fn set_read_index(&self, value: usize) {
self.read_index.store(value, Ordering::Release);
}
}

impl<S: Storage> RingBuffer for SharedRb<S> {
impl<S: Storage + ?Sized> RingBuffer for SharedRb<S> {
#[inline]
unsafe fn hold_read(&self, flag: bool) -> bool {
self.read_held.swap(flag, Ordering::Relaxed)
Expand All @@ -136,23 +142,43 @@ impl<S: Storage> RingBuffer for SharedRb<S> {
}
}

impl<S: Storage> Drop for SharedRb<S> {
impl<S: Storage + ?Sized> Drop for SharedRb<S> {
fn drop(&mut self) {
self.clear();
}
}

#[cfg(feature = "alloc")]
impl<S: Storage> Split for SharedRb<S> {
impl<S: Storage> Split for SharedRb<S>
where
S::Internal: Sized,
{
type Prod = CachingProd<Arc<Self>>;
type Cons = CachingCons<Arc<Self>>;

fn split(self) -> (Self::Prod, Self::Cons) {
let rc = Arc::new(self);
(CachingProd::new(rc.clone()), CachingCons::new(rc))
Arc::new(self).split()
}
}
#[cfg(feature = "alloc")]
impl<S: Storage + ?Sized> Split for Arc<SharedRb<S>> {
type Prod = CachingProd<Self>;
type Cons = CachingCons<Self>;

fn split(self) -> (Self::Prod, Self::Cons) {
(CachingProd::new(self.clone()), CachingCons::new(self))
}
}
#[cfg(feature = "alloc")]
impl<S: Storage + ?Sized> Split for Box<SharedRb<S>> {
type Prod = CachingProd<Arc<SharedRb<S>>>;
type Cons = CachingCons<Arc<SharedRb<S>>>;

fn split(self) -> (Self::Prod, Self::Cons) {
Arc::<SharedRb<S>>::from(self).split()
}
}
impl<S: Storage> SplitRef for SharedRb<S> {
impl<S: Storage + ?Sized> SplitRef for SharedRb<S> {
type RefProd<'a> = CachingProd<&'a Self> where Self: 'a;
type RefCons<'a> = CachingCons<&'a Self> where Self: 'a;

Expand All @@ -166,12 +192,12 @@ rb_impl_init!(SharedRb);
impl_producer_traits!(SharedRb<S: Storage>);
impl_consumer_traits!(SharedRb<S: Storage>);

impl<S: Storage> AsRef<Self> for SharedRb<S> {
impl<S: Storage + ?Sized> AsRef<Self> for SharedRb<S> {
fn as_ref(&self) -> &Self {
self
}
}
impl<S: Storage> AsMut<Self> for SharedRb<S> {
impl<S: Storage + ?Sized> AsMut<Self> for SharedRb<S> {
fn as_mut(&mut self) -> &mut Self {
self
}
Expand Down
8 changes: 4 additions & 4 deletions src/rb/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ use alloc::{rc::Rc, sync::Arc};
/// Implementation must be fair (e.g. not replacing pointers between calls and so on).
pub unsafe trait RbRef: Clone + AsRef<Self::Rb> {
/// Underlying ring buffer.
type Rb: RingBuffer;
type Rb: RingBuffer + ?Sized;
/// Get ring buffer reference.
fn rb(&self) -> &Self::Rb {
self.as_ref()
}
}

unsafe impl<'a, B: RingBuffer + AsRef<B>> RbRef for &'a B {
unsafe impl<'a, B: RingBuffer + AsRef<B> + ?Sized> RbRef for &'a B {
type Rb = B;
}
#[cfg(feature = "alloc")]
unsafe impl<B: RingBuffer> RbRef for Rc<B> {
unsafe impl<B: RingBuffer + ?Sized> RbRef for Rc<B> {
type Rb = B;
}
#[cfg(feature = "alloc")]
unsafe impl<B: RingBuffer> RbRef for Arc<B> {
unsafe impl<B: RingBuffer + ?Sized> RbRef for Arc<B> {
type Rb = B;
}
Loading

0 comments on commit bba11ad

Please sign in to comment.