diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 66ba4d0d5..8293983d8 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -56,14 +56,14 @@ impl SharedPollState { } /// Attempts to start polling, returning stored state in case of success. - /// Returns `None` if some waker is waking at the moment. + /// Returns `None` if either waker is waking at the moment or state is empty. fn start_polling( &self, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - if value & WAKING == NONE { + if value & WAKING == NONE && value & NEED_TO_POLL_ALL != NONE { Some(POLLING) } else { None @@ -99,8 +99,10 @@ impl SharedPollState { }) .ok()?; + debug_assert!(value & WAKING == NONE); + // Only start the waking process if we're not in the polling phase and the stream isn't woken already - if value & (WOKEN | POLLING | WAKING) == NONE { + if value & (WOKEN | POLLING) == NONE { let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); Some((value, bomb)) @@ -135,9 +137,10 @@ impl SharedPollState { /// Toggles state to non-waking, allowing to start polling. fn stop_waking(&self) -> u8 { - self.state + let value = self + .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value & !WAKING; + let next_value = value & !WAKING | WOKEN; if next_value != value { Some(next_value) @@ -145,7 +148,10 @@ impl SharedPollState { None } }) - .unwrap_or_else(identity) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value } /// Resets current state allowing to poll the stream and wake up wakers. @@ -170,11 +176,6 @@ impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { fn deactivate(mut self) { self.drop.take(); } - - /// Manually fires the bomb, returning supplied state. - fn fire(mut self) -> Option { - self.drop.take().map(|drop| (drop)(self.state)) - } } impl u8> Drop for PollStateBomb<'_, F> { @@ -225,13 +226,10 @@ impl ArcWake for WrappedWaker { if let Some(inner_waker) = waker_opt.clone() { // Stop waking to allow polling stream - let poll_state_value = state_bomb.fire().unwrap(); + drop(state_bomb); - // We want to call waker only if the stream isn't woken yet - if poll_state_value & (WOKEN | WAKING) == WAKING { - // Wake up inner waker - inner_waker.wake(); - } + // Wake up inner waker + inner_waker.wake(); } } } diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index aaad910bf..e21b51402 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -27,7 +27,7 @@ delegate_all!( St: TryStream, St::Ok: TryStream, St::Ok: Unpin, - ::Error: From + ::Error: From ); pin_project! { @@ -40,7 +40,7 @@ pin_project! { St: TryStream, St::Ok: TryStream, St::Ok: Unpin, - ::Error: From + ::Error: From { #[pin] stream: St, @@ -121,10 +121,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams +impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream + Sink, - St::Ok: Stream> + Unpin, + St::Ok: TryStream + Unpin, ::Error: From<::Error>, { type Error = >::Error;