diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 30de0804b0ac..57e80a0140c9 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -25,8 +25,34 @@ use crate::executor::Message; type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; type StreamReaderData = StreamExecutorResult>; type ReaderArm = BoxStream<'static, StreamReaderData>; -type StreamReaderWithPauseInner = - SelectWithStrategy, ReaderArm, impl FnMut(&mut PollNext) -> PollNext, PollNext>; + +mod stream_reader_with_pause { + use futures::stream::{select_with_strategy, PollNext, SelectWithStrategy}; + + use crate::executor::stream_reader::ReaderArm; + + pub(super) type StreamReaderWithPauseInner = SelectWithStrategy< + ReaderArm, + ReaderArm, + impl FnMut(&mut PollNext) -> PollNext, + PollNext, + >; + + pub(super) fn new_inner( + message_stream: ReaderArm, + data_stream: ReaderArm, + ) -> StreamReaderWithPauseInner { + let strategy = if BIASED { + |_: &mut PollNext| PollNext::Left + } else { + // The poll strategy is not biased: we poll the two streams in a round robin way. + |last: &mut PollNext| last.toggle() + }; + select_with_strategy(message_stream, data_stream, strategy) + } +} + +use stream_reader_with_pause::*; /// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types /// of messages) and the other receiving data only (no barrier). The merged stream can be paused @@ -40,7 +66,7 @@ type StreamReaderWithPauseInner = /// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin /// fashion. pub(super) struct StreamReaderWithPause { - inner: StreamReaderWithPauseInner, + inner: StreamReaderWithPauseInner, /// Whether the source stream is paused. paused: bool, } @@ -54,26 +80,13 @@ impl StreamReaderWithPause { ) -> Self { let message_stream_arm = message_stream.map_ok(Either::Left).boxed(); let data_stream_arm = data_stream.map_ok(Either::Right).boxed(); - let inner = Self::new_inner(message_stream_arm, data_stream_arm); + let inner = new_inner(message_stream_arm, data_stream_arm); Self { inner, paused: false, } } - fn new_inner( - message_stream: ReaderArm, - data_stream: ReaderArm, - ) -> StreamReaderWithPauseInner { - let strategy = if BIASED { - |_: &mut PollNext| PollNext::Left - } else { - // The poll strategy is not biased: we poll the two streams in a round robin way. - |last: &mut PollNext| last.toggle() - }; - select_with_strategy(message_stream, data_stream, strategy) - } - /// Replace the data stream with a new one for given `stream`. Used for split change. pub fn replace_data_stream( &mut self, @@ -87,7 +100,7 @@ impl StreamReaderWithPause { // Note: create a new `SelectWithStrategy` instead of replacing the source stream arm here, // to ensure the internal state of the `SelectWithStrategy` is reset. (#6300) - self.inner = Self::new_inner( + self.inner = new_inner( barrier_receiver_arm, data_stream.map_ok(Either::Right).boxed(), ); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 43a979af2b56..5574966efdf5 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -88,8 +88,61 @@ pub(super) struct BarrierState { inner: ManagedBarrierStateInner, } -type AwaitEpochCompletedFuture = - impl Future)> + 'static; +mod await_epoch_completed_future { + use std::future::Future; + + use futures::future::BoxFuture; + use futures::FutureExt; + use risingwave_hummock_sdk::SyncResult; + use risingwave_pb::stream_service::barrier_complete_response::{ + CreateMviewProgress, PbCreateMviewProgress, + }; + + use crate::error::StreamResult; + use crate::executor::Barrier; + use crate::task::{await_tree_key, BarrierCompleteResult}; + + pub(super) type AwaitEpochCompletedFuture = + impl Future)> + 'static; + + pub(super) fn instrument_complete_barrier_future( + complete_barrier_future: Option>>, + barrier: Barrier, + barrier_await_tree_reg: Option<&await_tree::Registry>, + create_mview_progress: Vec, + ) -> AwaitEpochCompletedFuture { + let prev_epoch = barrier.epoch.prev; + let future = async move { + if let Some(future) = complete_barrier_future { + let result = future.await; + result.map(Some) + } else { + Ok(None) + } + } + .map(move |result| { + ( + barrier, + result.map(|sync_result| BarrierCompleteResult { + sync_result, + create_mview_progress, + }), + ) + }); + if let Some(reg) = barrier_await_tree_reg { + reg.register( + await_tree_key::BarrierAwait { prev_epoch }, + format!("SyncEpoch({})", prev_epoch), + ) + .instrument(future) + .left_future() + } else { + future.right_future() + } + } +} + +use await_epoch_completed_future::*; fn sync_epoch( state_store: &S, @@ -787,33 +840,12 @@ impl PartialGraphManagedBarrierState { let barrier = barrier_state.barrier.clone(); self.await_epoch_completed_futures.push_back({ - let future = async move { - if let Some(future) = complete_barrier_future { - let result = future.await; - result.map(Some) - } else { - Ok(None) - } - } - .map(move |result| { - ( - barrier, - result.map(|sync_result| BarrierCompleteResult { - sync_result, - create_mview_progress, - }), - ) - }); - if let Some(reg) = &self.barrier_await_tree_reg { - reg.register( - await_tree_key::BarrierAwait { prev_epoch }, - format!("SyncEpoch({})", prev_epoch), - ) - .instrument(future) - .left_future() - } else { - future.right_future() - } + instrument_complete_barrier_future( + complete_barrier_future, + barrier, + self.barrier_await_tree_reg.as_ref(), + create_mview_progress, + ) }); } }