Skip to content

Commit

Permalink
Several tweaks to FlattenUnordered and TryFlattenUnordered (#2590)
Browse files Browse the repository at this point in the history
  • Loading branch information
olegnn authored May 28, 2022
1 parent a0c9fd5 commit 90094e1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
32 changes: 15 additions & 17 deletions futures-util/src/stream/stream/flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ impl SharedPollState {
}

/// Attempts to start polling, returning stored state in case of success.
/// Returns `None` if some waker is waking at the moment.
/// Returns `None` if either waker is waking at the moment or state is empty.
fn start_polling(
&self,
) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
if value & WAKING == NONE {
if value & WAKING == NONE && value & NEED_TO_POLL_ALL != NONE {
Some(POLLING)
} else {
None
Expand Down Expand Up @@ -99,8 +99,10 @@ impl SharedPollState {
})
.ok()?;

debug_assert!(value & WAKING == NONE);

// Only start the waking process if we're not in the polling phase and the stream isn't woken already
if value & (WOKEN | POLLING | WAKING) == NONE {
if value & (WOKEN | POLLING) == NONE {
let bomb = PollStateBomb::new(self, SharedPollState::stop_waking);

Some((value, bomb))
Expand Down Expand Up @@ -135,17 +137,21 @@ impl SharedPollState {

/// Toggles state to non-waking, allowing to start polling.
fn stop_waking(&self) -> u8 {
self.state
let value = self
.state
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
let next_value = value & !WAKING;
let next_value = value & !WAKING | WOKEN;

if next_value != value {
Some(next_value)
} else {
None
}
})
.unwrap_or_else(identity)
.unwrap_or_else(identity);

debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING);
value
}

/// Resets current state allowing to poll the stream and wake up wakers.
Expand All @@ -170,11 +176,6 @@ impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
fn deactivate(mut self) {
self.drop.take();
}

/// Manually fires the bomb, returning supplied state.
fn fire(mut self) -> Option<u8> {
self.drop.take().map(|drop| (drop)(self.state))
}
}

impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
Expand Down Expand Up @@ -225,13 +226,10 @@ impl ArcWake for WrappedWaker {

if let Some(inner_waker) = waker_opt.clone() {
// Stop waking to allow polling stream
let poll_state_value = state_bomb.fire().unwrap();
drop(state_bomb);

// We want to call waker only if the stream isn't woken yet
if poll_state_value & (WOKEN | WAKING) == WAKING {
// Wake up inner waker
inner_waker.wake();
}
// Wake up inner waker
inner_waker.wake();
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/stream/try_stream/try_flatten_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ delegate_all!(
St: TryStream,
St::Ok: TryStream,
St::Ok: Unpin,
<St::Ok as TryStream>::Error: From<St::Error>
<St::Ok as TryStream>::Error: From<St::Error>
);

pin_project! {
Expand All @@ -40,7 +40,7 @@ pin_project! {
St: TryStream,
St::Ok: TryStream,
St::Ok: Unpin,
<St::Ok as TryStream>::Error: From<St::Error>
<St::Ok as TryStream>::Error: From<St::Error>
{
#[pin]
stream: St,
Expand Down Expand Up @@ -121,10 +121,10 @@ where

// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<St, I, E, Item> Sink<Item> for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
impl<St, Item> Sink<Item> for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams<St>
where
St: TryStream + Sink<Item>,
St::Ok: Stream<Item = Result<I, E>> + Unpin,
St::Ok: TryStream + Unpin,
<St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
{
type Error = <St as Sink<Item>>::Error;
Expand Down

0 comments on commit 90094e1

Please sign in to comment.