Skip to content

Commit

Permalink
Remove some async traits, use hold info from base rb
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Aug 17, 2023
1 parent 169b2ab commit 81eb75d
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 371 deletions.
69 changes: 31 additions & 38 deletions async/src/rb.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
use crate::{
traits::{AsyncConsumer, AsyncObserver, AsyncProducer, AsyncRingBuffer},
wrap::{AsyncCons, AsyncProd},
};
use crate::wrap::{AsyncCons, AsyncProd};
#[cfg(feature = "alloc")]
use alloc::sync::Arc;
use core::{
mem::MaybeUninit,
num::NonZeroUsize,
sync::atomic::{AtomicBool, Ordering},
task::Waker,
};
use core::{mem::MaybeUninit, num::NonZeroUsize};
use futures::task::AtomicWaker;
#[cfg(feature = "alloc")]
use ringbuf::traits::Split;
use ringbuf::{
rb::traits::RbRef,
storage::Storage,
traits::{Consumer, Observer, Producer, RingBuffer, SplitRef},
SharedRb,
};

pub trait AsyncRbRef: RbRef<Target = AsyncRb<Self::Storage>> {
type Storage: Storage;
}
impl<S: Storage, R: RbRef<Target = AsyncRb<S>>> AsyncRbRef for R {
type Storage = S;
}

pub struct AsyncRb<S: Storage> {
base: SharedRb<S>,
read: AtomicWaker,
write: AtomicWaker,
closed: AtomicBool,
pub(crate) read: AtomicWaker,
pub(crate) write: AtomicWaker,
}

impl<S: Storage> AsyncRb<S> {
Expand All @@ -32,7 +31,6 @@ impl<S: Storage> AsyncRb<S> {
base,
read: AtomicWaker::default(),
write: AtomicWaker::default(),
closed: AtomicBool::new(false),
}
}
}
Expand All @@ -59,47 +57,42 @@ impl<S: Storage> Observer for AsyncRb<S> {
unsafe fn unsafe_slices(&self, start: usize, end: usize) -> (&mut [MaybeUninit<S::Item>], &mut [MaybeUninit<S::Item>]) {
self.base.unsafe_slices(start, end)
}

#[inline]
fn read_is_held(&self) -> bool {
self.base.read_is_held()
}
#[inline]
fn write_is_held(&self) -> bool {
self.base.write_is_held()
}
}

impl<S: Storage> Producer for AsyncRb<S> {
unsafe fn set_write_index(&self, value: usize) {
self.base.set_write_index(value);
self.write.wake();
}
fn close(&mut self) {}
}
impl<S: Storage> Consumer for AsyncRb<S> {
unsafe fn set_read_index(&self, value: usize) {
self.base.set_read_index(value);
self.read.wake();
}
fn close(&mut self) {}
}
impl<S: Storage> RingBuffer for AsyncRb<S> {}

impl<S: Storage> AsyncObserver for AsyncRb<S> {
fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
fn close(&self) {
self.closed.store(true, Ordering::Relaxed);
}
}
impl<S: Storage> AsyncProducer for AsyncRb<S> {
fn register_read_waker(&self, waker: &Waker) {
self.read.register(waker);
}
}
impl<S: Storage> AsyncConsumer for AsyncRb<S> {
fn register_write_waker(&self, waker: &Waker) {
self.write.register(waker);
impl<S: Storage> RingBuffer for AsyncRb<S> {
#[inline]
fn hold_read(&self, flag: bool) {
self.base.hold_read(flag);
self.read.wake()
}
}
impl<S: Storage> AsyncRingBuffer for AsyncRb<S> {
fn wake_consumer(&self) {
#[inline]
fn hold_write(&self, flag: bool) {
self.base.hold_write(flag);
self.write.wake()
}
fn wake_producer(&self) {
self.read.wake()
}
}

impl<S: Storage> SplitRef for AsyncRb<S> {
Expand Down
111 changes: 49 additions & 62 deletions async/src/traits/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
use super::{AsyncObserver, AsyncRingBuffer};
use crate::wrap::AsyncCons;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll, Waker},
};
#[cfg(feature = "std")]
use futures::io::AsyncRead;
use futures::{future::FusedFuture, Stream};
use ringbuf::{
rb::traits::RbRef,
traits::{Consumer, Observer},
};
use futures::future::FusedFuture;
use ringbuf::traits::Consumer;
#[cfg(feature = "std")]
use std::io;

pub trait AsyncConsumer: AsyncObserver + Consumer {
fn register_write_waker(&self, waker: &Waker);
pub trait AsyncConsumer: Consumer {
fn register_waker(&self, waker: &Waker);

/// Pop item from the ring buffer waiting asynchronously if the buffer is empty.
///
Expand All @@ -29,7 +22,7 @@ pub trait AsyncConsumer: AsyncObserver + Consumer {

/// Wait for the buffer to contain at least `count` items or to close.
///
/// Panics if `count` is greater than buffer capacity.
/// In debug mode panics if `count` is greater than buffer capacity.
fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> {
debug_assert!(count <= self.capacity().get());
WaitOccupiedFuture {
Expand All @@ -54,6 +47,47 @@ pub trait AsyncConsumer: AsyncObserver + Consumer {
count: 0,
}
}

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
where
Self: Unpin,
{
let mut waker_registered = false;
loop {
let closed = self.is_closed();
if let Some(item) = self.try_pop() {
break Poll::Ready(Some(item));
}
if closed {
break Poll::Ready(None);
}
if waker_registered {
break Poll::Pending;
}
self.register_waker(cx.waker());
waker_registered = true;
}
}

#[cfg(feature = "std")]
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>
where
Self: AsyncConsumer<Item = u8> + Unpin,
{
let mut waker_registered = false;
loop {
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
break Poll::Ready(Ok(len));
}
if waker_registered {
break Poll::Pending;
}
self.register_waker(cx.waker());
waker_registered = true;
}
}
}

pub struct PopFuture<'a, A: AsyncConsumer> {
Expand Down Expand Up @@ -84,7 +118,7 @@ impl<'a, A: AsyncConsumer> Future for PopFuture<'a, A> {
if waker_registered {
break Poll::Pending;
}
self.owner.register_write_waker(cx.waker());
self.owner.register_waker(cx.waker());
waker_registered = true;
}
}
Expand Down Expand Up @@ -131,7 +165,7 @@ where
if waker_registered {
break Poll::Pending;
}
self.owner.register_write_waker(cx.waker());
self.owner.register_waker(cx.waker());
waker_registered = true;
}
}
Expand Down Expand Up @@ -162,54 +196,7 @@ impl<'a, A: AsyncConsumer> Future for WaitOccupiedFuture<'a, A> {
if waker_registered {
break Poll::Pending;
}
self.owner.register_write_waker(cx.waker());
waker_registered = true;
}
}
}

impl<R: RbRef> Stream for AsyncCons<R>
where
R::Target: AsyncRingBuffer,
{
type Item = <R::Target as Observer>::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut waker_registered = false;
loop {
let closed = self.is_closed();
if let Some(item) = self.try_pop() {
break Poll::Ready(Some(item));
}
if closed {
break Poll::Ready(None);
}
if waker_registered {
break Poll::Pending;
}
self.register_write_waker(cx.waker());
waker_registered = true;
}
}
}

#[cfg(feature = "std")]
impl<R: RbRef> AsyncRead for AsyncCons<R>
where
R::Target: AsyncRingBuffer<Item = u8>,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let mut waker_registered = false;
loop {
let closed = self.is_closed();
let len = self.pop_slice(buf);
if len != 0 || closed {
break Poll::Ready(Ok(len));
}
if waker_registered {
break Poll::Pending;
}
self.register_write_waker(cx.waker());
self.owner.register_waker(cx.waker());
waker_registered = true;
}
}
Expand Down
4 changes: 0 additions & 4 deletions async/src/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
pub mod consumer;
pub mod observer;
pub mod producer;
pub mod ring_buffer;

pub use consumer::AsyncConsumer;
pub use observer::AsyncObserver;
pub use producer::AsyncProducer;
pub use ring_buffer::AsyncRingBuffer;

pub use ringbuf::traits::*;
6 changes: 0 additions & 6 deletions async/src/traits/observer.rs

This file was deleted.

Loading

0 comments on commit 81eb75d

Please sign in to comment.