From f89d2ddc15fb4e410c3729319cddfc12e8ef4528 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Wed, 22 Feb 2023 03:54:35 +0000 Subject: [PATCH 1/4] enable dml executor to pause and resume on scaling --- src/stream/src/executor/dml.rs | 49 ++++++---- src/stream/src/executor/mod.rs | 1 + .../src/executor/source/fs_source_executor.rs | 88 +++++++++-------- src/stream/src/executor/source/mod.rs | 1 - .../src/executor/source/source_executor.rs | 94 ++++++++++-------- .../{source/reader.rs => stream_reader.rs} | 97 ++++++++++--------- 6 files changed, 185 insertions(+), 145 deletions(-) rename src/stream/src/executor/{source/reader.rs => stream_reader.rs} (65%) diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 9a16b75fbc1d3..27255e75d4868 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -12,18 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::future::Either; -use futures::stream::select; +use either::Either; use futures::StreamExt; use futures_async_stream::try_stream; use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId}; -use risingwave_connector::source::StreamChunkWithState; use risingwave_source::dml_manager::DmlManagerRef; use super::error::StreamExecutorError; +use super::stream_reader::ReaderStreamWithPause; use super::{ - expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, - PkIndicesRef, + expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, Mutation, + PkIndices, PkIndicesRef, }; /// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific @@ -92,31 +91,39 @@ impl DmlExecutor { .dml_manager .register_reader(self.table_id, self.table_version_id, &self.column_descs) .map_err(StreamExecutorError::connector_error)?; - let batch_reader = batch_reader - .stream_reader() - .into_stream() - .map(Either::Right); + let batch_reader = batch_reader.stream_reader().into_stream(); - yield Message::Barrier(barrier); + // Merge the two streams using `ReaderStreamWithPause` because when we receive a pause + // barrier, we should stop receiving the data from DML. + let mut stream = ReaderStreamWithPause::new_with_message_stream(upstream, batch_reader); - // Stream data from the upstream executor. - let upstream = upstream.map(Either::Left); + // If the first barrier is configuration change, then the DML executor must be newly + // created, and we should start with the paused state. + if barrier.is_update() { + stream.pause_data_stream(); + } - // Merge the two streams. - let stream = select(upstream, batch_reader); + yield Message::Barrier(barrier); - #[for_await] - for input_msg in stream { - match input_msg { + while let Some(input_msg) = stream.next().await { + match input_msg? { Either::Left(msg) => { - // Stream data. - let msg: Message = msg?; + // Stream messages. + if let Message::Barrier(barrier) = &msg { + // We should deal with barrier messages here to pause or resume the data + // from DML. + if let Some(mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::Pause => stream.pause_data_stream(), + Mutation::Resume => stream.resume_data_stream(), + _ => {} + } + } + } yield msg; } Either::Right(chunk) => { // Batch data. - let chunk: StreamChunkWithState = - chunk.map_err(StreamExecutorError::connector_error)?; yield Message::Chunk(chunk.chunk); } } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index ac7f897a391dc..6331b4a1b3825 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -85,6 +85,7 @@ mod sink; mod sort; mod sort_buffer; pub mod source; +mod stream_reader; pub mod subtask; mod top_n; mod union; diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 7120e9d5c349b..326e066b11030 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -33,7 +33,7 @@ use super::executor_core::StreamSourceCore; use crate::error::StreamResult; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; -use crate::executor::source::reader::SourceReaderStream; +use crate::executor::stream_reader::ReaderStreamWithPause; use crate::executor::*; /// [`FsSourceExecutor`] is a streaming source, fir external file systems /// such as s3. @@ -109,7 +109,7 @@ impl FsSourceExecutor { async fn apply_split_change( &mut self, source_desc: &FsSourceDesc, - stream: &mut SourceReaderStream, + stream: &mut ReaderStreamWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -175,7 +175,7 @@ impl FsSourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, source_desc: &FsSourceDesc, - stream: &mut SourceReaderStream, + stream: &mut ReaderStreamWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -329,9 +329,10 @@ impl FsSourceExecutor { .await?; // Merge the chunks from source and the barriers into a single stream. - let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader); + let mut stream = + ReaderStreamWithPause::new_with_barrier_receiver(barrier_receiver, source_chunk_reader); if start_with_paused { - stream.pause_source(); + stream.pause_data_stream(); } yield Message::Barrier(barrier); @@ -345,42 +346,53 @@ impl FsSourceExecutor { while let Some(msg) = stream.next().await { match msg? { // This branch will be preferred. - Either::Left(barrier) => { - last_barrier_time = Instant::now(); - if self_paused { - stream.resume_source(); - self_paused = false; - } - let epoch = barrier.epoch; - - if let Some(ref mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::SourceChangeSplit(actor_splits) => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) - .await? - } - Mutation::Pause => stream.pause_source(), - Mutation::Resume => stream.resume_source(), - Mutation::Update { actor_splits, .. } => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + last_barrier_time = Instant::now(); + if self_paused { + stream.resume_data_stream(); + self_paused = false; + } + let epoch = barrier.epoch; + + if let Some(ref mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::SourceChangeSplit(actor_splits) => { + self.apply_split_change(&source_desc, &mut stream, actor_splits) + .await? + } + Mutation::Pause => stream.pause_data_stream(), + Mutation::Resume => stream.resume_data_stream(), + Mutation::Update { actor_splits, .. } => { + self.apply_split_change( + &source_desc, + &mut stream, + actor_splits, + ) .await?; + } + _ => {} } - _ => {} } + self.take_snapshot_and_clear_cache(epoch).await?; + + self.metrics + .source_row_per_barrier + .with_label_values(&[ + self.ctx.id.to_string().as_str(), + self.stream_source_core.source_identify.as_ref(), + ]) + .inc_by(metric_row_per_barrier); + metric_row_per_barrier = 0; + + yield msg; } - self.take_snapshot_and_clear_cache(epoch).await?; - - self.metrics - .source_row_per_barrier - .with_label_values(&[ - self.ctx.id.to_string().as_str(), - self.stream_source_core.source_identify.as_ref(), - ]) - .inc_by(metric_row_per_barrier); - metric_row_per_barrier = 0; - - yield Message::Barrier(barrier); - } + _ => { + // For the source executor, the message we receive from this arm should + // always be barrier message. + unreachable!(); + } + }, Either::Right(StreamChunkWithState { chunk, @@ -391,7 +403,7 @@ impl FsSourceExecutor { // we can guarantee the source is not paused since it received stream // chunks. self_paused = true; - stream.pause_source(); + stream.pause_data_stream(); } // update split offset if let Some(mapping) = split_offset_mapping { diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 1ca31c79696da..9fe9611991a2b 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -19,7 +19,6 @@ pub use fs_source_executor::*; pub mod source_executor; -mod reader; pub mod state_table_handler; pub use state_table_handler::*; diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index d54372bec1b20..49c68b439cbe9 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -28,7 +28,7 @@ use tokio::time::Instant; use super::executor_core::StreamSourceCore; use crate::executor::monitor::StreamingMetrics; -use crate::executor::source::reader::SourceReaderStream; +use crate::executor::stream_reader::ReaderStreamWithPause; use crate::executor::*; /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to @@ -109,7 +109,7 @@ impl SourceExecutor { async fn apply_split_change( &mut self, source_desc: &SourceDesc, - stream: &mut SourceReaderStream, + stream: &mut ReaderStreamWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -166,7 +166,7 @@ impl SourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, source_desc: &SourceDesc, - stream: &mut SourceReaderStream, + stream: &mut ReaderStreamWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -290,12 +290,13 @@ impl SourceExecutor { .await?; // Merge the chunks from source and the barriers into a single stream. - let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader); + let mut stream = + ReaderStreamWithPause::new_with_barrier_receiver(barrier_receiver, source_chunk_reader); // If the first barrier is configuration change, then the source executor must be newly // created, and we should start with the paused state. if barrier.is_update() { - stream.pause_source(); + stream.pause_data_stream(); } yield Message::Barrier(barrier); @@ -310,47 +311,58 @@ impl SourceExecutor { while let Some(msg) = stream.next().await { match msg? { // This branch will be preferred. - Either::Left(barrier) => { - last_barrier_time = Instant::now(); - if self_paused { - stream.resume_source(); - self_paused = false; - } - let epoch = barrier.epoch; - - if let Some(ref mutation) = barrier.mutation.as_deref() { - match mutation { - Mutation::SourceChangeSplit(actor_splits) => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) - .await? - } - Mutation::Pause => stream.pause_source(), - Mutation::Resume => stream.resume_source(), - Mutation::Update { actor_splits, .. } => { - self.apply_split_change(&source_desc, &mut stream, actor_splits) + Either::Left(msg) => match &msg { + Message::Barrier(barrier) => { + last_barrier_time = Instant::now(); + if self_paused { + stream.resume_data_stream(); + self_paused = false; + } + let epoch = barrier.epoch; + + if let Some(ref mutation) = barrier.mutation.as_deref() { + match mutation { + Mutation::SourceChangeSplit(actor_splits) => { + self.apply_split_change(&source_desc, &mut stream, actor_splits) + .await? + } + Mutation::Pause => stream.pause_data_stream(), + Mutation::Resume => stream.resume_data_stream(), + Mutation::Update { actor_splits, .. } => { + self.apply_split_change( + &source_desc, + &mut stream, + actor_splits, + ) .await?; + } + _ => {} } - _ => {} } - } - self.take_snapshot_and_clear_cache(epoch).await?; + self.take_snapshot_and_clear_cache(epoch).await?; - self.metrics - .source_row_per_barrier - .with_label_values(&[ - self.ctx.id.to_string().as_str(), - self.stream_source_core - .as_ref() - .unwrap() - .source_identify - .as_ref(), - ]) - .inc_by(metric_row_per_barrier); - metric_row_per_barrier = 0; + self.metrics + .source_row_per_barrier + .with_label_values(&[ + self.ctx.id.to_string().as_str(), + self.stream_source_core + .as_ref() + .unwrap() + .source_identify + .as_ref(), + ]) + .inc_by(metric_row_per_barrier); + metric_row_per_barrier = 0; - yield Message::Barrier(barrier); - } + yield msg; + } + _ => { + // For the source executor, the message we receive from this arm should + // always be barrier message. + unreachable!(); + } + }, Either::Right(StreamChunkWithState { chunk, @@ -361,7 +373,7 @@ impl SourceExecutor { // we can guarantee the source is not paused since it received stream // chunks. self_paused = true; - stream.pause_source(); + stream.pause_data_stream(); } if let Some(mapping) = split_offset_mapping { let state: HashMap<_, _> = mapping diff --git a/src/stream/src/executor/source/reader.rs b/src/stream/src/executor/stream_reader.rs similarity index 65% rename from src/stream/src/executor/source/reader.rs rename to src/stream/src/executor/stream_reader.rs index b801b853eb0eb..50805e528a757 100644 --- a/src/stream/src/executor/source/reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -25,32 +25,33 @@ use risingwave_connector::source::{BoxSourceWithStateStream, StreamChunkWithStat use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; -use crate::executor::Barrier; +use crate::executor::{Barrier, Message}; -type SourceReaderMessage = StreamExecutorResult>; -type SourceReaderArm = BoxStream<'static, SourceReaderMessage>; -type SourceReaderStreamInner = - SelectWithStrategy PollNext, ()>; +type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; +type ReaderStreamData = StreamExecutorResult>; +type ReaderArm = BoxStream<'static, ReaderStreamData>; +type ReaderStreamWithPauseInner = + SelectWithStrategy PollNext, ()>; -pub(super) struct SourceReaderStream { - inner: SourceReaderStreamInner, +pub(super) struct ReaderStreamWithPause { + inner: ReaderStreamWithPauseInner, /// Whether the source stream is paused. paused: bool, } -impl SourceReaderStream { +impl ReaderStreamWithPause { /// Receive barriers from barrier manager with the channel, error on channel close. - #[try_stream(ok = Barrier, error = StreamExecutorError)] + #[try_stream(ok = Message, error = StreamExecutorError)] async fn barrier_receiver(mut rx: UnboundedReceiver) { while let Some(barrier) = rx.recv().stack_trace("source_recv_barrier").await { - yield barrier; + yield Message::Barrier(barrier); } bail!("barrier reader closed unexpectedly"); } - /// Receive chunks and states from the source reader, hang up on error. + /// Receive chunks and states from the reader. Hang up on error. #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)] - async fn source_stream(stream: BoxSourceWithStateStream) { + async fn data_stream(stream: BoxSourceWithStateStream) { // TODO: support stack trace for Stream #[for_await] for chunk in stream { @@ -64,38 +65,48 @@ impl SourceReaderStream { } } - /// Convert this reader to a stream. - pub fn new( + // TODO(Yuanxin): Add comments. + pub fn new_with_message_stream( + message_stream: ExecutorMessageStream, + data_stream: BoxSourceWithStateStream, + ) -> Self { + let message_stream_arm = message_stream.map_ok(Either::Left).boxed(); + let data_stream_arm = Self::data_stream(data_stream).map_ok(Either::Right).boxed(); + let inner = Self::new_inner(message_stream_arm, data_stream_arm); + Self { + inner, + paused: false, + } + } + + // TODO(Yuanxin): Add comments. + pub fn new_with_barrier_receiver( barrier_receiver: UnboundedReceiver, - source_stream: BoxSourceWithStateStream, + data_stream: BoxSourceWithStateStream, ) -> Self { + let barrier_receiver_arm = Self::barrier_receiver(barrier_receiver) + .map_ok(Either::Left) + .boxed(); + let data_stream_arm = Self::data_stream(data_stream).map_ok(Either::Right).boxed(); + let inner = Self::new_inner(barrier_receiver_arm, data_stream_arm); Self { - inner: Self::new_inner( - Self::barrier_receiver(barrier_receiver) - .map_ok(Either::Left) - .boxed(), - Self::source_stream(source_stream) - .map_ok(Either::Right) - .boxed(), - ), + inner, paused: false, } } - fn new_inner( - barrier_receiver_arm: SourceReaderArm, - source_stream_arm: SourceReaderArm, - ) -> SourceReaderStreamInner { + fn new_inner(message_stream: ReaderArm, data_stream: ReaderArm) -> ReaderStreamWithPauseInner { select_with_strategy( - barrier_receiver_arm, - source_stream_arm, - // We prefer barrier on the left hand side over source chunks. + message_stream, + data_stream, + // We prefer the left stream (which contains barriers of `Mutation::Pause` and + // `Mutation::Resume`) over the stream of data chunks. |_: &mut ()| PollNext::Left, ) } - /// Replace the source stream with a new one for given `stream`. Used for split change. - pub fn replace_source_stream(&mut self, source_stream: BoxSourceWithStateStream) { + /// Replace the data stream with a new one for given `stream`. Used for split change. + pub fn replace_source_stream(&mut self, data_stream: BoxSourceWithStateStream) { // Take the barrier receiver arm. let barrier_receiver_arm = std::mem::replace( self.inner.get_mut().0, @@ -106,27 +117,25 @@ impl SourceReaderStream { // to ensure the internal state of the `SelectWithStrategy` is reset. (#6300) self.inner = Self::new_inner( barrier_receiver_arm, - Self::source_stream(source_stream) - .map_ok(Either::Right) - .boxed(), + Self::data_stream(data_stream).map_ok(Either::Right).boxed(), ); } - /// Pause the source stream. - pub fn pause_source(&mut self) { + /// Pause the data stream. + pub fn pause_data_stream(&mut self) { assert!(!self.paused, "already paused"); self.paused = true; } - /// Resume the source stream, panic if the source is not paused before. - pub fn resume_source(&mut self) { + /// Resume the data stream. Panic if the data stream is not paused. + pub fn resume_data_stream(&mut self) { assert!(self.paused, "not paused"); self.paused = false; } } -impl Stream for SourceReaderStream { - type Item = SourceReaderMessage; +impl Stream for ReaderStreamWithPause { + type Item = ReaderStreamData; fn poll_next( mut self: Pin<&mut Self>, @@ -157,7 +166,7 @@ mod tests { let table_dml_handle = TableDmlHandle::new(vec![]); let source_stream = table_dml_handle.stream_reader().into_stream(); - let stream = SourceReaderStream::new(barrier_rx, source_stream); + let stream = ReaderStreamWithPause::new_with_barrier_receiver(barrier_rx, source_stream); pin_mut!(stream); macro_rules! next { @@ -181,7 +190,7 @@ mod tests { assert_matches!(next!().unwrap(), Either::Left(_)); // Pause the stream. - stream.pause_source(); + stream.pause_data_stream(); // Write a barrier. barrier_tx.send(Barrier::new_test_barrier(2)).unwrap(); @@ -197,7 +206,7 @@ mod tests { assert!(next!().is_none()); // Resume the stream. - stream.resume_source(); + stream.resume_data_stream(); // Then we can receive the chunk sent when the stream is paused. assert_matches!(next!().unwrap(), Either::Right(_)); } From 7e90734f4aa7a00ae9f5078b5a96d7615c708235 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Wed, 22 Feb 2023 06:41:21 +0000 Subject: [PATCH 2/4] introduce non-biased ReaderStreamWithPause --- src/stream/src/executor/dml.rs | 6 ++- .../src/executor/source/fs_source_executor.rs | 17 +++++---- .../src/executor/source/source_executor.rs | 17 +++++---- src/stream/src/executor/stream_reader.rs | 38 ++++++++++++------- 4 files changed, 48 insertions(+), 30 deletions(-) diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 27255e75d4868..2a84632b3ddde 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -94,8 +94,10 @@ impl DmlExecutor { let batch_reader = batch_reader.stream_reader().into_stream(); // Merge the two streams using `ReaderStreamWithPause` because when we receive a pause - // barrier, we should stop receiving the data from DML. - let mut stream = ReaderStreamWithPause::new_with_message_stream(upstream, batch_reader); + // barrier, we should stop receiving the data from DML. We poll data from the two streams in + // a round robin way. + let mut stream = + ReaderStreamWithPause::::new_with_message_stream(upstream, batch_reader); // If the first barrier is configuration change, then the DML executor must be newly // created, and we should start with the paused state. diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 326e066b11030..043d252a8ecef 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -106,10 +106,10 @@ impl FsSourceExecutor { Ok(steam_reader.into_stream()) } - async fn apply_split_change( + async fn apply_split_change( &mut self, source_desc: &FsSourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut ReaderStreamWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -172,10 +172,10 @@ impl FsSourceExecutor { Ok((!no_change_flag).then_some(target_state)) } - async fn replace_stream_reader_with_target_state( + async fn replace_stream_reader_with_target_state( &mut self, source_desc: &FsSourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut ReaderStreamWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -328,9 +328,12 @@ impl FsSourceExecutor { .stack_trace("fs_source_start_reader") .await?; - // Merge the chunks from source and the barriers into a single stream. - let mut stream = - ReaderStreamWithPause::new_with_barrier_receiver(barrier_receiver, source_chunk_reader); + // Merge the chunks from source and the barriers into a single stream. We prioritize + // barriers over source data chunks here. + let mut stream = ReaderStreamWithPause::::new_with_barrier_receiver( + barrier_receiver, + source_chunk_reader, + ); if start_with_paused { stream.pause_data_stream(); } diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 49c68b439cbe9..46257dc18e4ff 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -106,10 +106,10 @@ impl SourceExecutor { .map_err(StreamExecutorError::connector_error) } - async fn apply_split_change( + async fn apply_split_change( &mut self, source_desc: &SourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut ReaderStreamWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -163,10 +163,10 @@ impl SourceExecutor { Ok((!no_change_flag).then_some(target_state)) } - async fn replace_stream_reader_with_target_state( + async fn replace_stream_reader_with_target_state( &mut self, source_desc: &SourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut ReaderStreamWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -289,9 +289,12 @@ impl SourceExecutor { .stack_trace("source_build_reader") .await?; - // Merge the chunks from source and the barriers into a single stream. - let mut stream = - ReaderStreamWithPause::new_with_barrier_receiver(barrier_receiver, source_chunk_reader); + // Merge the chunks from source and the barriers into a single stream. We prioritize + // barriers over source data chunks here. + let mut stream = ReaderStreamWithPause::::new_with_barrier_receiver( + barrier_receiver, + source_chunk_reader, + ); // If the first barrier is configuration change, then the source executor must be newly // created, and we should start with the paused state. diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index 50805e528a757..e48c040566448 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -31,15 +31,17 @@ type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; type ReaderStreamData = StreamExecutorResult>; type ReaderArm = BoxStream<'static, ReaderStreamData>; type ReaderStreamWithPauseInner = - SelectWithStrategy PollNext, ()>; + SelectWithStrategy PollNext, PollNext>; -pub(super) struct ReaderStreamWithPause { +/// `ReaderStreamWithPause` merges two streams, with one of them receiving barriers. When the +/// barrier requires the data stream +pub(super) struct ReaderStreamWithPause { inner: ReaderStreamWithPauseInner, /// Whether the source stream is paused. paused: bool, } -impl ReaderStreamWithPause { +impl ReaderStreamWithPause { /// Receive barriers from barrier manager with the channel, error on channel close. #[try_stream(ok = Message, error = StreamExecutorError)] async fn barrier_receiver(mut rx: UnboundedReceiver) { @@ -65,7 +67,8 @@ impl ReaderStreamWithPause { } } - // TODO(Yuanxin): Add comments. + /// Construct a `ReaderStreamWithPause` with one stream receiving streaming messages and the + /// other receiving data only. pub fn new_with_message_stream( message_stream: ExecutorMessageStream, data_stream: BoxSourceWithStateStream, @@ -79,7 +82,8 @@ impl ReaderStreamWithPause { } } - // TODO(Yuanxin): Add comments. + /// Construct a `ReaderStreamWithPause` with one stream receiving barriers only and the other + /// receiving data only. pub fn new_with_barrier_receiver( barrier_receiver: UnboundedReceiver, data_stream: BoxSourceWithStateStream, @@ -96,13 +100,13 @@ impl ReaderStreamWithPause { } fn new_inner(message_stream: ReaderArm, data_stream: ReaderArm) -> ReaderStreamWithPauseInner { - select_with_strategy( - message_stream, - data_stream, - // We prefer the left stream (which contains barriers of `Mutation::Pause` and - // `Mutation::Resume`) over the stream of data chunks. - |_: &mut ()| PollNext::Left, - ) + let strategy = if BIASED { + |_: &mut PollNext| PollNext::Left + } else { + // The poll strategy is not biased: we poll the two streams in a round robin way. + |last: &mut PollNext| last.toggle() + }; + select_with_strategy(message_stream, data_stream, strategy) } /// Replace the data stream with a new one for given `stream`. Used for split change. @@ -134,7 +138,7 @@ impl ReaderStreamWithPause { } } -impl Stream for ReaderStreamWithPause { +impl Stream for ReaderStreamWithPause { type Item = ReaderStreamData; fn poll_next( @@ -142,8 +146,13 @@ impl Stream for ReaderStreamWithPause { ctx: &mut std::task::Context<'_>, ) -> Poll> { if self.paused { + // Note: It is safe here to poll the left arm even if it contains streaming messages + // other than barriers: after the upstream executor sends a `Mutation::Pause`, there + // should be no more message until a `Mutation::Update` and a 'Mutation::Resume`. self.inner.get_mut().0.poll_next_unpin(ctx) } else { + // TODO: We may need to prioritize the data stream (right hand stream) after resuming + // from the paused state. self.inner.poll_next_unpin(ctx) } } @@ -166,7 +175,8 @@ mod tests { let table_dml_handle = TableDmlHandle::new(vec![]); let source_stream = table_dml_handle.stream_reader().into_stream(); - let stream = ReaderStreamWithPause::new_with_barrier_receiver(barrier_rx, source_stream); + let stream = + ReaderStreamWithPause::::new_with_barrier_receiver(barrier_rx, source_stream); pin_mut!(stream); macro_rules! next { From 4077564e8bb7c1601d678340214353cb0b9eaebc Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Wed, 22 Feb 2023 08:29:13 +0000 Subject: [PATCH 3/4] apply pr suggestions & rename --- src/stream/src/executor/dml.rs | 13 ++- .../src/executor/source/fs_source_executor.rs | 22 +++-- src/stream/src/executor/source/mod.rs | 18 +++- .../src/executor/source/source_executor.rs | 22 +++-- src/stream/src/executor/stream_reader.rs | 82 ++++++++----------- 5 files changed, 75 insertions(+), 82 deletions(-) diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index 2a84632b3ddde..ed5d6f46585ef 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -19,7 +19,7 @@ use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId}; use risingwave_source::dml_manager::DmlManagerRef; use super::error::StreamExecutorError; -use super::stream_reader::ReaderStreamWithPause; +use super::stream_reader::StreamReaderWithPause; use super::{ expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, Mutation, PkIndices, PkIndicesRef, @@ -93,16 +93,15 @@ impl DmlExecutor { .map_err(StreamExecutorError::connector_error)?; let batch_reader = batch_reader.stream_reader().into_stream(); - // Merge the two streams using `ReaderStreamWithPause` because when we receive a pause + // Merge the two streams using `StreamReaderWithPause` because when we receive a pause // barrier, we should stop receiving the data from DML. We poll data from the two streams in // a round robin way. - let mut stream = - ReaderStreamWithPause::::new_with_message_stream(upstream, batch_reader); + let mut stream = StreamReaderWithPause::::new(upstream, batch_reader); // If the first barrier is configuration change, then the DML executor must be newly // created, and we should start with the paused state. if barrier.is_update() { - stream.pause_data_stream(); + stream.pause_stream(); } yield Message::Barrier(barrier); @@ -116,8 +115,8 @@ impl DmlExecutor { // from DML. if let Some(mutation) = barrier.mutation.as_deref() { match mutation { - Mutation::Pause => stream.pause_data_stream(), - Mutation::Resume => stream.resume_data_stream(), + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), _ => {} } } diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 043d252a8ecef..9c6b53a740718 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -33,7 +33,7 @@ use super::executor_core::StreamSourceCore; use crate::error::StreamResult; use crate::executor::error::StreamExecutorError; use crate::executor::monitor::StreamingMetrics; -use crate::executor::stream_reader::ReaderStreamWithPause; +use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; /// [`FsSourceExecutor`] is a streaming source, fir external file systems /// such as s3. @@ -109,7 +109,7 @@ impl FsSourceExecutor { async fn apply_split_change( &mut self, source_desc: &FsSourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut StreamReaderWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -175,7 +175,7 @@ impl FsSourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, source_desc: &FsSourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -330,12 +330,10 @@ impl FsSourceExecutor { // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - let mut stream = ReaderStreamWithPause::::new_with_barrier_receiver( - barrier_receiver, - source_chunk_reader, - ); + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); if start_with_paused { - stream.pause_data_stream(); + stream.pause_stream(); } yield Message::Barrier(barrier); @@ -353,7 +351,7 @@ impl FsSourceExecutor { Message::Barrier(barrier) => { last_barrier_time = Instant::now(); if self_paused { - stream.resume_data_stream(); + stream.resume_stream(); self_paused = false; } let epoch = barrier.epoch; @@ -364,8 +362,8 @@ impl FsSourceExecutor { self.apply_split_change(&source_desc, &mut stream, actor_splits) .await? } - Mutation::Pause => stream.pause_data_stream(), - Mutation::Resume => stream.resume_data_stream(), + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), Mutation::Update { actor_splits, .. } => { self.apply_split_change( &source_desc, @@ -406,7 +404,7 @@ impl FsSourceExecutor { // we can guarantee the source is not paused since it received stream // chunks. self_paused = true; - stream.pause_data_stream(); + stream.pause_stream(); } // update split offset if let Some(mapping) = split_offset_mapping { diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 9fe9611991a2b..7c13505e58545 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -13,12 +13,28 @@ // limitations under the License. pub mod executor_core; +use async_stack_trace::StackTrace; pub use executor_core::StreamSourceCore; mod fs_source_executor; pub use fs_source_executor::*; +use risingwave_common::bail; +pub use state_table_handler::*; pub mod source_executor; pub mod state_table_handler; -pub use state_table_handler::*; +use futures_async_stream::try_stream; +use tokio::sync::mpsc::UnboundedReceiver; + +use crate::executor::error::StreamExecutorError; +use crate::executor::{Barrier, Message}; + +/// Receive barriers from barrier manager with the channel, error on channel close. +#[try_stream(ok = Message, error = StreamExecutorError)] +pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver) { + while let Some(barrier) = rx.recv().stack_trace("receive_barrier").await { + yield Message::Barrier(barrier); + } + bail!("barrier reader closed unexpectedly"); +} diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 46257dc18e4ff..9bcbbd57b9a40 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -28,7 +28,7 @@ use tokio::time::Instant; use super::executor_core::StreamSourceCore; use crate::executor::monitor::StreamingMetrics; -use crate::executor::stream_reader::ReaderStreamWithPause; +use crate::executor::stream_reader::StreamReaderWithPause; use crate::executor::*; /// A constant to multiply when calculating the maximum time to wait for a barrier. This is due to @@ -109,7 +109,7 @@ impl SourceExecutor { async fn apply_split_change( &mut self, source_desc: &SourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut StreamReaderWithPause, mapping: &HashMap>, ) -> StreamExecutorResult<()> { if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() { @@ -166,7 +166,7 @@ impl SourceExecutor { async fn replace_stream_reader_with_target_state( &mut self, source_desc: &SourceDesc, - stream: &mut ReaderStreamWithPause, + stream: &mut StreamReaderWithPause, target_state: Vec, ) -> StreamExecutorResult<()> { tracing::info!( @@ -291,15 +291,13 @@ impl SourceExecutor { // Merge the chunks from source and the barriers into a single stream. We prioritize // barriers over source data chunks here. - let mut stream = ReaderStreamWithPause::::new_with_barrier_receiver( - barrier_receiver, - source_chunk_reader, - ); + let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed(); + let mut stream = StreamReaderWithPause::::new(barrier_stream, source_chunk_reader); // If the first barrier is configuration change, then the source executor must be newly // created, and we should start with the paused state. if barrier.is_update() { - stream.pause_data_stream(); + stream.pause_stream(); } yield Message::Barrier(barrier); @@ -318,7 +316,7 @@ impl SourceExecutor { Message::Barrier(barrier) => { last_barrier_time = Instant::now(); if self_paused { - stream.resume_data_stream(); + stream.resume_stream(); self_paused = false; } let epoch = barrier.epoch; @@ -329,8 +327,8 @@ impl SourceExecutor { self.apply_split_change(&source_desc, &mut stream, actor_splits) .await? } - Mutation::Pause => stream.pause_data_stream(), - Mutation::Resume => stream.resume_data_stream(), + Mutation::Pause => stream.pause_stream(), + Mutation::Resume => stream.resume_stream(), Mutation::Update { actor_splits, .. } => { self.apply_split_change( &source_desc, @@ -376,7 +374,7 @@ impl SourceExecutor { // we can guarantee the source is not paused since it received stream // chunks. self_paused = true; - stream.pause_data_stream(); + stream.pause_stream(); } if let Some(mapping) = split_offset_mapping { let state: HashMap<_, _> = mapping diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index e48c040566448..c7af4ca8a752b 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -20,37 +20,35 @@ use either::Either; use futures::stream::{select_with_strategy, BoxStream, PollNext, SelectWithStrategy}; use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; -use risingwave_common::bail; use risingwave_connector::source::{BoxSourceWithStateStream, StreamChunkWithState}; -use tokio::sync::mpsc::UnboundedReceiver; use crate::executor::error::{StreamExecutorError, StreamExecutorResult}; -use crate::executor::{Barrier, Message}; +use crate::executor::Message; type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult>; -type ReaderStreamData = StreamExecutorResult>; -type ReaderArm = BoxStream<'static, ReaderStreamData>; -type ReaderStreamWithPauseInner = +type StreamReaderData = StreamExecutorResult>; +type ReaderArm = BoxStream<'static, StreamReaderData>; +type StreamReaderWithPauseInner = SelectWithStrategy PollNext, PollNext>; -/// `ReaderStreamWithPause` merges two streams, with one of them receiving barriers. When the -/// barrier requires the data stream -pub(super) struct ReaderStreamWithPause { - inner: ReaderStreamWithPauseInner, +/// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types +/// of messages) and the other receiving data only (no barrier). The merged stream can be paused +/// (`StreamReaderWithPause::pause_stream`) and resumed (`StreamReaderWithPause::resume_stream`). +/// A paused stream will not receive any data from either original stream until a barrier arrives +/// and the stream is resumed. +/// +/// ## Priority +/// +/// If `BIASED` is `true`, the left-hand stream (the one receiving barriers) will get a higher +/// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin +/// fashion. +pub(super) struct StreamReaderWithPause { + inner: StreamReaderWithPauseInner, /// Whether the source stream is paused. paused: bool, } -impl ReaderStreamWithPause { - /// Receive barriers from barrier manager with the channel, error on channel close. - #[try_stream(ok = Message, error = StreamExecutorError)] - async fn barrier_receiver(mut rx: UnboundedReceiver) { - while let Some(barrier) = rx.recv().stack_trace("source_recv_barrier").await { - yield Message::Barrier(barrier); - } - bail!("barrier reader closed unexpectedly"); - } - +impl StreamReaderWithPause { /// Receive chunks and states from the reader. Hang up on error. #[try_stream(ok = StreamChunkWithState, error = StreamExecutorError)] async fn data_stream(stream: BoxSourceWithStateStream) { @@ -67,9 +65,9 @@ impl ReaderStreamWithPause { } } - /// Construct a `ReaderStreamWithPause` with one stream receiving streaming messages and the - /// other receiving data only. - pub fn new_with_message_stream( + /// Construct a `StreamReaderWithPause` with one stream receiving barrier messages (and maybe + /// other types of messages) and the other receiving data only (no barrier). + pub fn new( message_stream: ExecutorMessageStream, data_stream: BoxSourceWithStateStream, ) -> Self { @@ -82,24 +80,7 @@ impl ReaderStreamWithPause { } } - /// Construct a `ReaderStreamWithPause` with one stream receiving barriers only and the other - /// receiving data only. - pub fn new_with_barrier_receiver( - barrier_receiver: UnboundedReceiver, - data_stream: BoxSourceWithStateStream, - ) -> Self { - let barrier_receiver_arm = Self::barrier_receiver(barrier_receiver) - .map_ok(Either::Left) - .boxed(); - let data_stream_arm = Self::data_stream(data_stream).map_ok(Either::Right).boxed(); - let inner = Self::new_inner(barrier_receiver_arm, data_stream_arm); - Self { - inner, - paused: false, - } - } - - fn new_inner(message_stream: ReaderArm, data_stream: ReaderArm) -> ReaderStreamWithPauseInner { + fn new_inner(message_stream: ReaderArm, data_stream: ReaderArm) -> StreamReaderWithPauseInner { let strategy = if BIASED { |_: &mut PollNext| PollNext::Left } else { @@ -126,20 +107,20 @@ impl ReaderStreamWithPause { } /// Pause the data stream. - pub fn pause_data_stream(&mut self) { + pub fn pause_stream(&mut self) { assert!(!self.paused, "already paused"); self.paused = true; } /// Resume the data stream. Panic if the data stream is not paused. - pub fn resume_data_stream(&mut self) { + pub fn resume_stream(&mut self) { assert!(self.paused, "not paused"); self.paused = false; } } -impl Stream for ReaderStreamWithPause { - type Item = ReaderStreamData; +impl Stream for StreamReaderWithPause { + type Item = StreamReaderData; fn poll_next( mut self: Pin<&mut Self>, @@ -151,7 +132,7 @@ impl Stream for ReaderStreamWithPause { // should be no more message until a `Mutation::Update` and a 'Mutation::Resume`. self.inner.get_mut().0.poll_next_unpin(ctx) } else { - // TODO: We may need to prioritize the data stream (right hand stream) after resuming + // TODO: We may need to prioritize the data stream (right-hand stream) after resuming // from the paused state. self.inner.poll_next_unpin(ctx) } @@ -167,6 +148,7 @@ mod tests { use tokio::sync::mpsc; use super::*; + use crate::executor::{barrier_to_message_stream, Barrier}; #[tokio::test] async fn test_pause_and_resume() { @@ -175,8 +157,8 @@ mod tests { let table_dml_handle = TableDmlHandle::new(vec![]); let source_stream = table_dml_handle.stream_reader().into_stream(); - let stream = - ReaderStreamWithPause::::new_with_barrier_receiver(barrier_rx, source_stream); + let barrier_stream = barrier_to_message_stream(barrier_rx).boxed(); + let stream = StreamReaderWithPause::::new(barrier_stream, source_stream); pin_mut!(stream); macro_rules! next { @@ -200,7 +182,7 @@ mod tests { assert_matches!(next!().unwrap(), Either::Left(_)); // Pause the stream. - stream.pause_data_stream(); + stream.pause_stream(); // Write a barrier. barrier_tx.send(Barrier::new_test_barrier(2)).unwrap(); @@ -216,7 +198,7 @@ mod tests { assert!(next!().is_none()); // Resume the stream. - stream.resume_data_stream(); + stream.resume_stream(); // Then we can receive the chunk sent when the stream is paused. assert_matches!(next!().unwrap(), Either::Right(_)); } From 198737b3d97306519ec05d2ad6d95d2959807e06 Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Thu, 23 Feb 2023 00:53:57 +0000 Subject: [PATCH 4/4] rename & refine comments --- src/stream/src/executor/dml.rs | 4 ++-- src/stream/src/executor/source/fs_source_executor.rs | 2 +- src/stream/src/executor/source/source_executor.rs | 2 +- src/stream/src/executor/stream_reader.rs | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/dml.rs b/src/stream/src/executor/dml.rs index ed5d6f46585ef..181dc247eef57 100644 --- a/src/stream/src/executor/dml.rs +++ b/src/stream/src/executor/dml.rs @@ -111,8 +111,8 @@ impl DmlExecutor { Either::Left(msg) => { // Stream messages. if let Message::Barrier(barrier) = &msg { - // We should deal with barrier messages here to pause or resume the data - // from DML. + // We should handle barrier messages here to pause or resume the data from + // DML. if let Some(mutation) = barrier.mutation.as_deref() { match mutation { Mutation::Pause => stream.pause_stream(), diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index 9c6b53a740718..06cef7ee6d4ce 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -188,7 +188,7 @@ impl FsSourceExecutor { let reader = self .build_stream_source_reader(source_desc, Some(target_state.clone())) .await?; - stream.replace_source_stream(reader); + stream.replace_data_stream(reader); self.stream_source_core.stream_source_splits = target_state .into_iter() diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 9bcbbd57b9a40..f9eb095345462 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -179,7 +179,7 @@ impl SourceExecutor { let reader = self .build_stream_source_reader(source_desc, Some(target_state.clone())) .await?; - stream.replace_source_stream(reader); + stream.replace_data_stream(reader); self.stream_source_core .as_mut() diff --git a/src/stream/src/executor/stream_reader.rs b/src/stream/src/executor/stream_reader.rs index c7af4ca8a752b..aef2870a67e12 100644 --- a/src/stream/src/executor/stream_reader.rs +++ b/src/stream/src/executor/stream_reader.rs @@ -91,7 +91,7 @@ impl StreamReaderWithPause { } /// Replace the data stream with a new one for given `stream`. Used for split change. - pub fn replace_source_stream(&mut self, data_stream: BoxSourceWithStateStream) { + pub fn replace_data_stream(&mut self, data_stream: BoxSourceWithStateStream) { // Take the barrier receiver arm. let barrier_receiver_arm = std::mem::replace( self.inner.get_mut().0,