Skip to content

Commit

Permalink
Move uninit slice, add pop to vec methods
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Feb 3, 2024
1 parent 5630e46 commit 195bda6
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 60 deletions.
23 changes: 21 additions & 2 deletions async/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,37 @@ fn push_pop_slice() {
async move {
let mut prod = prod;
let data = (0..COUNT).collect::<Vec<_>>();
prod.push_slice_all(&data).await.unwrap();
prod.push_exact(&data).await.unwrap();
},
async move {
let mut cons = cons;
let mut data = [0; COUNT + 1];
let count = cons.pop_slice_all(&mut data).await.unwrap_err();
let count = cons.pop_exact(&mut data).await.unwrap_err();
assert_eq!(count, COUNT);
assert!(data.into_iter().take(COUNT).eq(0..COUNT));
},
);
}

#[test]
fn push_pop_vec() {
let (prod, cons) = AsyncHeapRb::<usize>::new(3).split();
execute!(
async move {
let mut prod = prod;
let data = (0..COUNT).collect::<Vec<_>>();
prod.push_exact(&data).await.unwrap();
},
async move {
let mut cons = cons;
let mut data = Vec::new();
cons.pop_until_end(&mut data).await;
assert_eq!(data.len(), COUNT);
assert!(data.into_iter().eq(0..COUNT));
},
);
}

#[test]
fn sink_stream() {
use futures::{
Expand Down
57 changes: 56 additions & 1 deletion async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub trait AsyncConsumer: Consumer {
/// Future returns:
/// + `Ok` - the whole slice is filled with the items from the buffer.
/// + `Err(count)` - the buffer is empty and the corresponding producer was dropped, number of items copied to slice is returned.
fn pop_slice_all<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
where
Self::Item: Copy,
{
Expand All @@ -54,6 +54,14 @@ pub trait AsyncConsumer: Consumer {
}
}

#[cfg(feature = "alloc")]
fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
PopVecFuture {
owner: self,
vec: Some(vec),
}
}

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
where
Self: Unpin,
Expand Down Expand Up @@ -177,6 +185,53 @@ where
}
}

#[cfg(feature = "alloc")]
pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
owner: &'a mut A,
vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
}
#[cfg(feature = "alloc")]
impl<'a, 'b, A: AsyncConsumer> Unpin for PopVecFuture<'a, 'b, A> {}
#[cfg(feature = "alloc")]
impl<'a, 'b, A: AsyncConsumer> FusedFuture for PopVecFuture<'a, 'b, A> {
fn is_terminated(&self) -> bool {
self.vec.is_none()
}
}
#[cfg(feature = "alloc")]
impl<'a, 'b, A: AsyncConsumer> Future for PopVecFuture<'a, 'b, A> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut waker_registered = false;
loop {
let closed = self.owner.is_closed();
let vec = self.vec.take().unwrap();

loop {
if vec.len() == vec.capacity() {
vec.reserve(vec.capacity().max(16));
}
let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
if n == 0 {
break;
}
unsafe { vec.set_len(vec.len() + n) };
}

if closed {
break Poll::Ready(());
}
self.vec.replace(vec);
if waker_registered {
break Poll::Pending;
}
self.owner.register_waker(cx.waker());
waker_registered = true;
}
}
}

pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
owner: &'a A,
count: usize,
Expand Down
2 changes: 1 addition & 1 deletion async/src/traits/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait AsyncProducer: Producer {
/// Future returns:
/// + `Ok` - all slice contents are copied.
/// + `Err(count)` - the corresponding consumer was dropped, number of copied items returned.
fn push_slice_all<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
where
Self::Item: Copy,
{
Expand Down
132 changes: 93 additions & 39 deletions blocking/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{traits::*, wrap::WaitError, BlockingHeapRb};
use std::{
io::{Read, Write},
sync::Arc,
thread,
time::Duration,
vec,
Expand All @@ -17,6 +18,7 @@ This book fully embraces the potential of Rust to empower its users. It's a frie
- Nicholas Matsakis and Aaron Turon
";
const N_REP: usize = 10;

const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));

Expand All @@ -26,16 +28,19 @@ fn wait() {
let rb = BlockingHeapRb::<u8>::new(7);
let (mut prod, mut cons) = rb.split();

let smsg = THE_BOOK_FOREWORD;

let pjh = thread::spawn(move || {
let mut bytes = smsg;
prod.set_timeout(TIMEOUT);
while !bytes.is_empty() {
assert_eq!(prod.wait_vacant(1), Ok(()));
let n = prod.push_slice(bytes);
assert!(n > 0);
bytes = &bytes[n..bytes.len()]
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));

let pjh = thread::spawn({
let smsg = smsg.clone();
move || {
let mut bytes = smsg.as_slice();
prod.set_timeout(TIMEOUT);
while !bytes.is_empty() {
assert_eq!(prod.wait_vacant(1), Ok(()));
let n = prod.push_slice(bytes);
assert!(n > 0);
bytes = &bytes[n..bytes.len()]
}
}
});

Expand All @@ -59,7 +64,7 @@ fn wait() {
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();

assert_eq!(smsg, rmsg);
assert_eq!(*smsg, rmsg);
}

#[test]
Expand All @@ -68,25 +73,65 @@ fn slice_all() {
let rb = BlockingHeapRb::<u8>::new(7);
let (mut prod, mut cons) = rb.split();

let smsg = THE_BOOK_FOREWORD;
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));

let pjh = thread::spawn(move || {
let bytes = smsg;
prod.set_timeout(TIMEOUT);
assert_eq!(prod.push_all_slice(bytes), bytes.len());
let pjh = thread::spawn({
let smsg = smsg.clone();
move || {
let bytes = smsg;
prod.set_timeout(TIMEOUT);
assert_eq!(prod.push_exact(&bytes), bytes.len());
}
});

let cjh = thread::spawn(move || {
let mut bytes = vec![0u8; smsg.len()];
cons.set_timeout(TIMEOUT);
assert_eq!(cons.pop_all_slice(&mut bytes), bytes.len());
bytes
let cjh = thread::spawn({
let smsg = smsg.clone();
move || {
let mut bytes = vec![0u8; smsg.len()];
cons.set_timeout(TIMEOUT);
assert_eq!(cons.pop_exact(&mut bytes), bytes.len());
bytes
}
});

pjh.join().unwrap();
let rmsg = cjh.join().unwrap();

assert_eq!(smsg, rmsg);
assert_eq!(*smsg, rmsg);
}

#[test]
#[cfg_attr(miri, ignore)]
fn vec_all() {
let rb = BlockingHeapRb::<u8>::new(7);
let (mut prod, mut cons) = rb.split();

let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));

let pjh = thread::spawn({
let smsg = smsg.clone();
move || {
let bytes = smsg;
prod.set_timeout(TIMEOUT);
assert_eq!(prod.push_exact(&bytes), bytes.len());
}
});

let cjh = thread::spawn({
let smsg = smsg.clone();
move || {
let mut bytes = Vec::new();
cons.set_timeout(TIMEOUT);
cons.pop_until_end(&mut bytes);
assert_eq!(bytes.len(), smsg.len());
bytes
}
});

pjh.join().unwrap();
let rmsg = cjh.join().unwrap();

assert_eq!(*smsg, rmsg);
}

#[test]
Expand All @@ -95,12 +140,15 @@ fn iter_all() {
let rb = BlockingHeapRb::<u8>::new(7);
let (mut prod, mut cons) = rb.split();

let smsg = THE_BOOK_FOREWORD;
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));

let pjh = thread::spawn(move || {
prod.set_timeout(TIMEOUT);
let bytes = smsg;
assert_eq!(prod.push_all_iter(bytes.iter().copied()), bytes.len());
let pjh = thread::spawn({
let smsg = smsg.clone();
move || {
prod.set_timeout(TIMEOUT);
let bytes = smsg;
assert_eq!(prod.push_all_iter(bytes.iter().copied()), bytes.len());
}
});

let cjh = thread::spawn(move || {
Expand All @@ -111,7 +159,7 @@ fn iter_all() {
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();

assert_eq!(smsg, rmsg);
assert_eq!(*smsg, rmsg);
}

#[test]
Expand All @@ -120,23 +168,29 @@ fn write_read() {
let rb = BlockingHeapRb::<u8>::new(7);
let (mut prod, mut cons) = rb.split();

let smsg = THE_BOOK_FOREWORD;
let smsg = Arc::new(THE_BOOK_FOREWORD.repeat(N_REP));

let pjh = thread::spawn(move || {
prod.set_timeout(TIMEOUT);
let bytes = smsg;
prod.write_all(bytes).unwrap();
let pjh = thread::spawn({
let smsg = smsg.clone();
move || {
prod.set_timeout(TIMEOUT);
let bytes = smsg;
prod.write_all(&bytes).unwrap();
}
});

let cjh = thread::spawn(move || {
cons.set_timeout(TIMEOUT);
let mut bytes = Vec::new();
assert_eq!(cons.read_to_end(&mut bytes).unwrap(), smsg.len());
bytes
let cjh = thread::spawn({
let smsg = smsg.clone();
move || {
cons.set_timeout(TIMEOUT);
let mut bytes = Vec::new();
assert_eq!(cons.read_to_end(&mut bytes).unwrap(), smsg.len());
bytes
}
});

pjh.join().unwrap();
let rmsg = cjh.join().unwrap();

assert_eq!(smsg, rmsg);
assert_eq!(*smsg, rmsg);
}
24 changes: 23 additions & 1 deletion blocking/src/wrap/cons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<R: BlockingRbRef> BlockingCons<R>
where
<Self as Observer>::Item: Copy,
{
pub fn pop_all_slice(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
pub fn pop_exact(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
if slice.is_empty() {
return 0;
}
Expand All @@ -83,6 +83,28 @@ where
}
count
}

#[cfg(feature = "alloc")]
pub fn pop_until_end(&mut self, vec: &mut alloc::vec::Vec<<Self as Observer>::Item>) {
if self.is_closed() && self.is_empty() {
return;
}
for _ in wait_iter!(self) {
loop {
if vec.len() == vec.capacity() {
vec.reserve(vec.capacity().max(16));
}
let n = self.base.pop_slice_uninit(vec.spare_capacity_mut());
if n == 0 {
break;
}
unsafe { vec.set_len(vec.len() + n) };
}
if self.is_closed() && self.is_empty() {
break;
}
}
}
}

#[cfg(feature = "std")]
Expand Down
2 changes: 1 addition & 1 deletion blocking/src/wrap/prod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<R: BlockingRbRef> BlockingProd<R>
where
<Self as Observer>::Item: Copy,
{
pub fn push_all_slice(&mut self, mut slice: &[<Self as Observer>::Item]) -> usize {
pub fn push_exact(&mut self, mut slice: &[<Self as Observer>::Item]) -> usize {
if slice.is_empty() {
return 0;
}
Expand Down
Loading

0 comments on commit 195bda6

Please sign in to comment.