Skip to content

Commit

Permalink
BlockingRb: Timeout is now state rather than argument
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Aug 18, 2023
1 parent ef41065 commit aa8cf5d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 61 deletions.
32 changes: 13 additions & 19 deletions blocking/src/rb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
use crate::sync::StdSemaphore;
use crate::{
sync::Semaphore,
traits::{BlockingConsumer, BlockingProducer},
wrap::{BlockingCons, BlockingProd},
};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use core::{mem::MaybeUninit, num::NonZeroUsize, time::Duration};
use core::{mem::MaybeUninit, num::NonZeroUsize};
#[cfg(feature = "alloc")]
use ringbuf::traits::Split;
use ringbuf::{
rb::traits::RbRef,
storage::Storage,
traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
SharedRb,
Expand All @@ -25,8 +25,8 @@ pub struct BlockingRb<S: Storage, X: Semaphore> {
#[cfg(feature = "std")]
pub struct BlockingRb<S: Storage, X: Semaphore = StdSemaphore> {
base: SharedRb<S>,
read: X,
write: X,
pub(crate) read: X,
pub(crate) write: X,
}

impl<S: Storage, X: Semaphore> BlockingRb<S, X> {
Expand Down Expand Up @@ -88,21 +88,6 @@ impl<S: Storage, X: Semaphore> RingBuffer for BlockingRb<S, X> {
}
}

impl<S: Storage, X: Semaphore> BlockingProducer for BlockingRb<S, X> {
type Instant = X::Instant;
fn wait_vacant(&self, count: usize, timeout: Option<Duration>) -> bool {
debug_assert!(count <= self.capacity().get());
self.read.wait(|| self.vacant_len() >= count, timeout)
}
}
impl<S: Storage, X: Semaphore> BlockingConsumer for BlockingRb<S, X> {
type Instant = X::Instant;
fn wait_occupied(&self, count: usize, timeout: Option<Duration>) -> bool {
debug_assert!(count <= self.capacity().get());
self.write.wait(|| self.occupied_len() >= count, timeout)
}
}

impl<S: Storage, X: Semaphore> SplitRef for BlockingRb<S, X> {
type RefProd<'a> = BlockingProd<&'a Self> where Self: 'a;
type RefCons<'a> = BlockingCons<&'a Self> where Self: 'a;
Expand All @@ -121,3 +106,12 @@ impl<S: Storage, X: Semaphore> Split for BlockingRb<S, X> {
(BlockingProd::new(arc.clone()), BlockingCons::new(arc))
}
}

pub trait BlockingRbRef: RbRef<Target = BlockingRb<Self::Storage, Self::Semaphore>> {
type Storage: Storage;
type Semaphore: Semaphore;
}
impl<S: Storage, X: Semaphore, R: RbRef<Target = BlockingRb<S, X>>> BlockingRbRef for R {
type Storage = S;
type Semaphore = X;
}
16 changes: 10 additions & 6 deletions blocking/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ fn slice_all() {

let pjh = thread::spawn(move || {
let bytes = smsg.as_bytes();
assert_eq!(prod.push_slice_all(bytes, TIMEOUT), bytes.len());
prod.push(0, TIMEOUT).unwrap();
prod.set_timeout(TIMEOUT);
assert_eq!(prod.push_slice_all(bytes), bytes.len());
prod.push(0).unwrap();
});

let cjh = thread::spawn(move || {
let mut bytes = vec![0u8; smsg.as_bytes().len()];
assert_eq!(cons.pop_slice_all(&mut bytes, TIMEOUT), bytes.len());
assert_eq!(cons.pop_wait(TIMEOUT).unwrap(), 0);
cons.set_timeout(TIMEOUT);
assert_eq!(cons.pop_slice_all(&mut bytes), bytes.len());
assert_eq!(cons.pop_wait().unwrap(), 0);
String::from_utf8(bytes).unwrap()
});

Expand All @@ -92,12 +94,14 @@ fn iter_all() {
let smsg = THE_BOOK_FOREWORD;

let pjh = thread::spawn(move || {
prod.set_timeout(TIMEOUT);
let bytes = smsg.as_bytes();
assert_eq!(prod.push_iter_all(bytes.iter().copied().chain(once(0)), TIMEOUT), bytes.len() + 1);
assert_eq!(prod.push_iter_all(bytes.iter().copied().chain(once(0))), bytes.len() + 1);
});

let cjh = thread::spawn(move || {
let bytes = cons.pop_iter_all(TIMEOUT).take_while(|x| *x != 0).collect::<Vec<_>>();
cons.set_timeout(TIMEOUT);
let bytes = cons.pop_iter_all().take_while(|x| *x != 0).collect::<Vec<_>>();
String::from_utf8(bytes).unwrap()
});

Expand Down
30 changes: 18 additions & 12 deletions blocking/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@ pub trait BlockingProducer: Producer {

fn wait_vacant(&self, count: usize, timeout: Option<Duration>) -> bool;

fn push(&mut self, item: Self::Item, timeout: Option<Duration>) -> Result<(), Self::Item> {
if self.wait_vacant(1, timeout) {
fn set_timeout(&mut self, timeout: Option<Duration>);
fn timeout(&self) -> Option<Duration>;

fn push(&mut self, item: Self::Item) -> Result<(), Self::Item> {
if self.wait_vacant(1, self.timeout()) {
assert!(self.try_push(item).is_ok());
Ok(())
} else {
Err(item)
}
}

fn push_iter_all<I: Iterator<Item = Self::Item>>(&mut self, iter: I, timeout: Option<Duration>) -> usize {
fn push_iter_all<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> usize {
let mut count = 0;
let mut iter = iter.peekable();
for timeout in TimeoutIterator::<Self::Instant>::new(timeout) {
for timeout in TimeoutIterator::<Self::Instant>::new(self.timeout()) {
if iter.peek().is_none() {
break;
}
Expand All @@ -31,12 +34,12 @@ pub trait BlockingProducer: Producer {
count
}

fn push_slice_all(&mut self, mut slice: &[Self::Item], timeout: Option<Duration>) -> usize
fn push_slice_all(&mut self, mut slice: &[Self::Item]) -> usize
where
Self::Item: Copy,
{
let mut count = 0;
for timeout in TimeoutIterator::<Self::Instant>::new(timeout) {
for timeout in TimeoutIterator::<Self::Instant>::new(self.timeout()) {
if slice.is_empty() {
break;
}
Expand All @@ -55,24 +58,27 @@ pub trait BlockingConsumer: Consumer {

fn wait_occupied(&self, count: usize, timeout: Option<Duration>) -> bool;

fn pop_wait(&mut self, timeout: Option<Duration>) -> Option<Self::Item> {
if self.wait_occupied(1, timeout) {
fn set_timeout(&mut self, timeout: Option<Duration>);
fn timeout(&self) -> Option<Duration>;

fn pop_wait(&mut self) -> Option<Self::Item> {
if self.wait_occupied(1, self.timeout()) {
Some(self.try_pop().unwrap())
} else {
None
}
}

fn pop_iter_all(&mut self, timeout: Option<Duration>) -> PopAllIter<'_, Self> {
PopAllIter::new(self, timeout)
fn pop_iter_all(&mut self) -> PopAllIter<'_, Self> {
PopAllIter::new(self, self.timeout())
}

fn pop_slice_all(&mut self, mut slice: &mut [Self::Item], timeout: Option<Duration>) -> usize
fn pop_slice_all(&mut self, mut slice: &mut [Self::Item]) -> usize
where
Self::Item: Copy,
{
let mut count = 0;
for timeout in TimeoutIterator::<Self::Instant>::new(timeout) {
for timeout in TimeoutIterator::<Self::Instant>::new(self.timeout()) {
if slice.is_empty() {
break;
}
Expand Down
70 changes: 46 additions & 24 deletions blocking/src/wrap.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
use crate::traits::{BlockingConsumer, BlockingProducer};
use crate::{
rb::BlockingRbRef,
sync::Semaphore,
traits::{BlockingConsumer, BlockingProducer},
};
use core::time::Duration;
use ringbuf::{
rb::traits::{RbRef, ToRbRef},
traits::{consumer::DelegateConsumer, observer::DelegateObserver, producer::DelegateProducer, Based},
rb::traits::ToRbRef,
traits::{
consumer::DelegateConsumer,
observer::{DelegateObserver, Observer},
producer::DelegateProducer,
Based,
},
wrap::caching::Caching,
Obs,
};

pub struct BlockingWrap<R: RbRef, const P: bool, const C: bool> {
pub struct BlockingWrap<R: BlockingRbRef, const P: bool, const C: bool> {
base: Option<Caching<R, P, C>>,
timeout: Option<Duration>,
}
pub type BlockingProd<R> = BlockingWrap<R, true, false>;
pub type BlockingCons<R> = BlockingWrap<R, false, true>;

impl<R: RbRef, const P: bool, const C: bool> BlockingWrap<R, P, C> {
impl<R: BlockingRbRef, const P: bool, const C: bool> BlockingWrap<R, P, C> {
pub fn new(rb: R) -> Self {
Self {
base: Some(Caching::new(rb)),
timeout: None,
}
}

pub fn observe(&self) -> Obs<R> {
self.base().observe()
}
}
impl<R: RbRef, const P: bool, const C: bool> Based for BlockingWrap<R, P, C> {
impl<R: BlockingRbRef, const P: bool, const C: bool> Based for BlockingWrap<R, P, C> {
type Base = Caching<R, P, C>;
fn base(&self) -> &Self::Base {
self.base.as_ref().unwrap()
Expand All @@ -33,7 +44,7 @@ impl<R: RbRef, const P: bool, const C: bool> Based for BlockingWrap<R, P, C> {
self.base.as_mut().unwrap()
}
}
impl<R: RbRef, const P: bool, const C: bool> ToRbRef for BlockingWrap<R, P, C> {
impl<R: BlockingRbRef, const P: bool, const C: bool> ToRbRef for BlockingWrap<R, P, C> {
type RbRef = R;
fn rb_ref(&self) -> &Self::RbRef {
self.base().rb_ref()
Expand All @@ -43,30 +54,41 @@ impl<R: RbRef, const P: bool, const C: bool> ToRbRef for BlockingWrap<R, P, C> {
}
}

impl<R: RbRef> DelegateObserver for BlockingProd<R> {}
impl<R: RbRef> DelegateProducer for BlockingProd<R> {}
impl<R: BlockingRbRef> DelegateObserver for BlockingProd<R> {}
impl<R: BlockingRbRef> DelegateProducer for BlockingProd<R> {}

impl<R: RbRef> BlockingProducer for BlockingProd<R>
where
R::Target: BlockingProducer,
{
type Instant = <R::Target as BlockingProducer>::Instant;
impl<R: BlockingRbRef> BlockingProducer for BlockingProd<R> {
type Instant = <R::Semaphore as Semaphore>::Instant;

fn wait_vacant(&self, count: usize, timeout: Option<Duration>) -> bool {
self.base().rb().wait_vacant(count, timeout)
let rb = self.rb();
debug_assert!(count <= rb.capacity().get());
rb.read.wait(|| rb.vacant_len() >= count, timeout)
}
}

impl<R: RbRef> DelegateObserver for BlockingCons<R> {}
impl<R: RbRef> DelegateConsumer for BlockingCons<R> {}
fn set_timeout(&mut self, timeout: Option<Duration>) {
self.timeout = timeout;
}
fn timeout(&self) -> Option<Duration> {
self.timeout
}
}

impl<R: RbRef> BlockingConsumer for BlockingCons<R>
where
R::Target: BlockingConsumer,
{
type Instant = <R::Target as BlockingConsumer>::Instant;
impl<R: BlockingRbRef> DelegateObserver for BlockingCons<R> {}
impl<R: BlockingRbRef> DelegateConsumer for BlockingCons<R> {}

impl<R: BlockingRbRef> BlockingConsumer for BlockingCons<R> {
type Instant = <R::Semaphore as Semaphore>::Instant;
fn wait_occupied(&self, count: usize, timeout: Option<Duration>) -> bool {
self.base().rb().wait_occupied(count, timeout)
let rb = self.rb();
debug_assert!(count <= rb.capacity().get());
rb.write.wait(|| rb.occupied_len() >= count, timeout)
}

fn set_timeout(&mut self, timeout: Option<Duration>) {
self.timeout = timeout;
}
fn timeout(&self) -> Option<Duration> {
self.timeout
}
}

0 comments on commit aa8cf5d

Please sign in to comment.