Skip to content

Commit

Permalink
TryFlattenUnordered: immediately propagate base stream errors
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn committed Jun 6, 2022
1 parent 3b29ad7 commit b19ca3c
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 55 deletions.
80 changes: 65 additions & 15 deletions futures-util/src/stream/stream/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::{
cell::UnsafeCell,
convert::identity,
fmt,
marker::PhantomData,
num::NonZeroUsize,
pin::Pin,
sync::atomic::{AtomicU8, Ordering},
Expand All @@ -22,6 +23,10 @@ use futures_task::{waker, ArcWake};

use crate::stream::FuturesUnordered;

/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
/// method.
pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>;

/// There is nothing to poll and stream isn't being polled/waking/woken at the moment.
const NONE: u8 = 0;

Expand Down Expand Up @@ -154,7 +159,7 @@ impl SharedPollState {

/// Resets current state allowing to poll the stream and wake up wakers.
fn reset(&self) -> u8 {
self.state.swap(NEED_TO_POLL_ALL, Ordering::AcqRel)
self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
}
}

Expand Down Expand Up @@ -276,10 +281,10 @@ impl<St: Stream + Unpin> Future for PollStreamFut<St> {

pin_project! {
/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
/// method.
#[project = FlattenUnorderedProj]
/// method with ability to specify flow controller.
#[project = FlattenUnorderedWithFlowControllerProj]
#[must_use = "streams do nothing unless polled"]
pub struct FlattenUnordered<St> where St: Stream {
pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream {
#[pin]
inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
#[pin]
Expand All @@ -289,34 +294,40 @@ pin_project! {
is_stream_done: bool,
inner_streams_waker: Arc<WrappedWaker>,
stream_waker: Arc<WrappedWaker>,
flow_controller: PhantomData<Fc>
}
}

impl<St> fmt::Debug for FlattenUnordered<St>
impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream + fmt::Debug,
St::Item: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FlattenUnordered")
f.debug_struct("FlattenUnorderedWithFlowController")
.field("poll_state", &self.poll_state)
.field("inner_streams", &self.inner_streams)
.field("limit", &self.limit)
.field("stream", &self.stream)
.field("is_stream_done", &self.is_stream_done)
.field("flow_controller", &self.flow_controller)
.finish()
}
}

impl<St> FlattenUnordered<St>
impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream,
Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
pub(super) fn new(stream: St, limit: Option<usize>) -> FlattenUnordered<St> {
pub(crate) fn new(
stream: St,
limit: Option<usize>,
) -> FlattenUnorderedWithFlowController<St, Fc> {
let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);

FlattenUnordered {
FlattenUnorderedWithFlowController {
inner_streams: FuturesUnordered::new(),
stream,
is_stream_done: false,
Expand All @@ -332,13 +343,35 @@ where
need_to_poll: NEED_TO_POLL_STREAM,
}),
poll_state,
flow_controller: PhantomData,
}
}

delegate_access_inner!(stream, St, ());
}

impl<St> FlattenUnorderedProj<'_, St>
/// Returns the next workflow step based on the received item.
pub trait FlowController<I, O> {
/// Handles an item producing `FlowStep` describing the next workflow step.
fn handle_item(item: I) -> FlowStep<I, O>;
}

impl<I, O> FlowController<I, O> for () {
fn handle_item(item: I) -> FlowStep<I, O> {
FlowStep::Continue(item)
}
}

/// Describes the next workflow step.
#[derive(Debug, Clone)]
pub enum FlowStep<C, R> {
/// Just yields an item and continues standard workflow.
Continue(C),
/// Immediately returns an underlying item from the function.
Return(R),
}

impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc>
where
St: Stream,
{
Expand All @@ -348,19 +381,21 @@ where
}
}

impl<St> FusedStream for FlattenUnordered<St>
impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc>
where
St: FusedStream,
Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.inner_streams.is_empty()
}
}

impl<St> Stream for FlattenUnordered<St>
impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream,
Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
St::Item: Stream + Unpin,
{
type Item = <St::Item as Stream>::Item;
Expand Down Expand Up @@ -405,8 +440,23 @@ where
let mut cx = Context::from_waker(stream_waker.as_ref().unwrap());

match this.stream.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(inner_stream)) => {
let next_item_fut = PollStreamFut::new(inner_stream);
Poll::Ready(Some(item)) => {
let next_item_fut = match Fc::handle_item(item) {
// Propagates an item immediately (the main use-case is for errors)
FlowStep::Return(item) => {
need_to_poll_next |= NEED_TO_POLL_STREAM
| (poll_state_value & NEED_TO_POLL_INNER_STREAMS);
poll_state_value &= !NEED_TO_POLL_INNER_STREAMS;

next_item = Some(item);

break;
}
// Yields an item and continues processing (normal case)
FlowStep::Continue(inner_stream) => {
PollStreamFut::new(inner_stream)
}
};
// Add new stream to the inner streams bucket
this.inner_streams.as_mut().push(next_item_fut);
// Inner streams must be polled afterward
Expand Down Expand Up @@ -478,7 +528,7 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, Item> Sink<Item> for FlattenUnordered<St>
impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc>
where
St: Stream + Sink<Item>,
{
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub use self::buffered::Buffered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
mod flatten_unordered;
pub(crate) mod flatten_unordered;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
Expand Down
112 changes: 78 additions & 34 deletions futures-util/src/stream/try_stream/try_flatten_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::marker::PhantomData;
use core::pin::Pin;

use futures_core::ready;
Expand All @@ -9,19 +10,23 @@ use futures_sink::Sink;
use pin_project_lite::pin_project;

use crate::future::Either;
use crate::stream::stream::FlattenUnordered;
use crate::StreamExt;

use super::IntoStream;
use crate::stream::stream::flatten_unordered::{
FlattenUnorderedWithFlowController, FlowController, FlowStep,
};
use crate::stream::IntoStream;
use crate::TryStreamExt;

delegate_all!(
/// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
TryFlattenUnordered<St>(
FlattenUnordered<TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>>
FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ New[
|stream: St, limit: impl Into<Option<usize>>|
TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams::new(stream).flatten_unordered(limit)
FlattenUnorderedWithFlowController::new(
NestedTryStreamIntoEitherTryStream::new(stream),
limit.into()
)
]
where
St: TryStream,
Expand All @@ -35,19 +40,19 @@ pin_project! {
/// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
pub struct NestedTryStreamIntoEitherTryStream<St>
where
St: TryStream,
St::Ok: TryStream,
St::Ok: Unpin,
<St::Ok as TryStream>::Error: From<St::Error>
{
#[pin]
stream: St,
stream: St
}
}

impl<St> TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
impl<St> NestedTryStreamIntoEitherTryStream<St>
where
St: TryStream,
St::Ok: TryStream + Unpin,
Expand All @@ -60,21 +65,22 @@ where
delegate_access_inner!(stream, St, ());
}

impl<St> FusedStream for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream + FusedStream,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<St::Error>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

/// Emits single item immediately, then stream will be terminated.
/// Emits a single item immediately, then stream will be terminated.
#[derive(Debug, Clone)]
pub struct Single<T>(Option<T>);

impl<T> Single<T> {
/// Constructs new `Single` with the given value.
fn new(val: T) -> Self {
Self(Some(val))
}

/// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
fn next_immediate(&mut self) -> Option<T> {
self.0.take()
}
}

impl<T> Unpin for Single<T> {}

impl<T> Stream for Single<T> {
Expand All @@ -89,9 +95,33 @@ impl<T> Stream for Single<T> {
}
}

/// Immediately propagates errors occurred in the base stream.
#[derive(Debug, Clone, Copy)]
pub struct PropagateBaseStreamError<St>(PhantomData<St>);

type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
type InnerStreamItem<St> =
<<NestedTryStreamIntoEitherTryStream<St> as Stream>::Item as Stream>::Item;

impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
where
St: TryStream,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<St::Error>,
{
fn handle_item(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
match item {
// A new successful inner stream received
st @ Either::Left(_) => FlowStep::Continue(st),
// An error encountered
Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
}
}
}

type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;

impl<St> Stream for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
where
St: TryStream,
St::Ok: TryStream + Unpin,
Expand All @@ -104,24 +134,38 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.project().stream.try_poll_next(cx));

let out = item.map(|res| match res {
// Emit successful inner stream as is
Ok(stream) => Either::Left(IntoStream::new(stream)),
// Wrap an error into a stream containing a single item
err @ Err(_) => {
let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);

Either::Right(Single(Some(res)))
}
});
let out = match item {
Some(res) => match res {
// Emit successful inner stream as is
Ok(stream) => Either::Left(stream.into_stream()),
// Wrap an error into a stream containing a single item
err @ Err(_) => {
let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);

Either::Right(Single::new(res))
}
},
None => return Poll::Ready(None),
};

Poll::Ready(Some(out))
}
}

Poll::Ready(out)
impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
where
St: TryStream + FusedStream,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<St::Error>,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, Item> Sink<Item> for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
where
St: TryStream + Sink<Item>,
St::Ok: TryStream + Unpin,
Expand Down
Loading

0 comments on commit b19ca3c

Please sign in to comment.