Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): enable dml executor to pause and resume on scaling #8110

Merged
merged 6 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions src/stream/src/executor/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::future::Either;
use futures::stream::select;
use either::Either;
use futures::StreamExt;
use futures_async_stream::try_stream;
use risingwave_common::catalog::{ColumnDesc, Schema, TableId, TableVersionId};
use risingwave_connector::source::StreamChunkWithState;
use risingwave_source::dml_manager::DmlManagerRef;

use super::error::StreamExecutorError;
use super::stream_reader::StreamReaderWithPause;
use super::{
expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices,
PkIndicesRef,
expect_first_barrier, BoxedExecutor, BoxedMessageStream, Executor, Message, Mutation,
PkIndices, PkIndicesRef,
};

/// [`DmlExecutor`] accepts both stream data and batch data for data manipulation on a specific
Expand Down Expand Up @@ -92,31 +91,40 @@ impl DmlExecutor {
.dml_manager
.register_reader(self.table_id, self.table_version_id, &self.column_descs)
.map_err(StreamExecutorError::connector_error)?;
let batch_reader = batch_reader
.stream_reader()
.into_stream()
.map(Either::Right);
let batch_reader = batch_reader.stream_reader().into_stream();

yield Message::Barrier(barrier);
// Merge the two streams using `StreamReaderWithPause` because when we receive a pause
// barrier, we should stop receiving the data from DML. We poll data from the two streams in
// a round robin way.
let mut stream = StreamReaderWithPause::<false>::new(upstream, batch_reader);

// Stream data from the upstream executor.
let upstream = upstream.map(Either::Left);
// If the first barrier is configuration change, then the DML executor must be newly
// created, and we should start with the paused state.
if barrier.is_update() {
stream.pause_stream();
}

// Merge the two streams.
let stream = select(upstream, batch_reader);
yield Message::Barrier(barrier);

#[for_await]
for input_msg in stream {
match input_msg {
while let Some(input_msg) = stream.next().await {
match input_msg? {
Either::Left(msg) => {
// Stream data.
let msg: Message = msg?;
// Stream messages.
if let Message::Barrier(barrier) = &msg {
// We should handle barrier messages here to pause or resume the data from
// DML.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
_ => {}
}
}
}
yield msg;
}
Either::Right(chunk) => {
// Batch data.
let chunk: StreamChunkWithState =
chunk.map_err(StreamExecutorError::connector_error)?;
yield Message::Chunk(chunk.chunk);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ mod sink;
mod sort;
mod sort_buffer;
pub mod source;
mod stream_reader;
pub mod subtask;
mod top_n;
mod union;
Expand Down
97 changes: 55 additions & 42 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use super::executor_core::StreamSourceCore;
use crate::error::StreamResult;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::source::reader::SourceReaderStream;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::*;
/// [`FsSourceExecutor`] is a streaming source, fir external file systems
/// such as s3.
Expand Down Expand Up @@ -110,10 +110,10 @@ impl<S: StateStore> FsSourceExecutor<S> {
Ok(steam_reader.into_stream())
}

async fn apply_split_change(
async fn apply_split_change<const BIASED: bool>(
&mut self,
source_desc: &FsSourceDesc,
stream: &mut SourceReaderStream,
stream: &mut StreamReaderWithPause<BIASED>,
mapping: &HashMap<ActorId, Vec<SplitImpl>>,
) -> StreamExecutorResult<()> {
if let Some(target_splits) = mapping.get(&self.ctx.id).cloned() {
Expand Down Expand Up @@ -176,10 +176,10 @@ impl<S: StateStore> FsSourceExecutor<S> {
Ok((!no_change_flag).then_some(target_state))
}

async fn replace_stream_reader_with_target_state(
async fn replace_stream_reader_with_target_state<const BIASED: bool>(
&mut self,
source_desc: &FsSourceDesc,
stream: &mut SourceReaderStream,
stream: &mut StreamReaderWithPause<BIASED>,
target_state: Vec<SplitImpl>,
) -> StreamExecutorResult<()> {
tracing::info!(
Expand All @@ -192,7 +192,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
let reader = self
.build_stream_source_reader(source_desc, Some(target_state.clone()))
.await?;
stream.replace_source_stream(reader);
stream.replace_data_stream(reader);

self.stream_source_core.stream_source_splits = target_state
.into_iter()
Expand Down Expand Up @@ -332,10 +332,12 @@ impl<S: StateStore> FsSourceExecutor<S> {
.stack_trace("fs_source_start_reader")
.await?;

// Merge the chunks from source and the barriers into a single stream.
let mut stream = SourceReaderStream::new(barrier_receiver, source_chunk_reader);
// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream = StreamReaderWithPause::<true>::new(barrier_stream, source_chunk_reader);
if start_with_paused {
stream.pause_source();
stream.pause_stream();
}

yield Message::Barrier(barrier);
Expand All @@ -349,42 +351,53 @@ impl<S: StateStore> FsSourceExecutor<S> {
while let Some(msg) = stream.next().await {
match msg? {
// This branch will be preferred.
Either::Left(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
stream.resume_source();
self_paused = false;
}
let epoch = barrier.epoch;

if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::SourceChangeSplit(actor_splits) => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?
}
Mutation::Pause => stream.pause_source(),
Mutation::Resume => stream.resume_source(),
Mutation::Update { actor_splits, .. } => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
Either::Left(msg) => match &msg {
Message::Barrier(barrier) => {
last_barrier_time = Instant::now();
if self_paused {
stream.resume_stream();
self_paused = false;
}
let epoch = barrier.epoch;

if let Some(ref mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::SourceChangeSplit(actor_splits) => {
self.apply_split_change(&source_desc, &mut stream, actor_splits)
.await?
}
Mutation::Pause => stream.pause_stream(),
Mutation::Resume => stream.resume_stream(),
Mutation::Update { actor_splits, .. } => {
self.apply_split_change(
&source_desc,
&mut stream,
actor_splits,
)
.await?;
}
_ => {}
}
_ => {}
}
self.take_snapshot_and_clear_cache(epoch).await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.ctx.id.to_string().as_str(),
self.stream_source_core.source_identify.as_ref(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield msg;
}
self.take_snapshot_and_clear_cache(epoch).await?;

self.metrics
.source_row_per_barrier
.with_label_values(&[
self.ctx.id.to_string().as_str(),
self.stream_source_core.source_identify.as_ref(),
])
.inc_by(metric_row_per_barrier);
metric_row_per_barrier = 0;

yield Message::Barrier(barrier);
}
_ => {
// For the source executor, the message we receive from this arm should
// always be barrier message.
unreachable!();
}
},

Either::Right(StreamChunkWithState {
chunk,
Expand All @@ -395,7 +408,7 @@ impl<S: StateStore> FsSourceExecutor<S> {
// we can guarantee the source is not paused since it received stream
// chunks.
self_paused = true;
stream.pause_source();
stream.pause_stream();
}
// update split offset
if let Some(mapping) = split_offset_mapping {
Expand Down
19 changes: 17 additions & 2 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,28 @@
// limitations under the License.

pub mod executor_core;
use async_stack_trace::StackTrace;
pub use executor_core::StreamSourceCore;
mod fs_source_executor;
pub use fs_source_executor::*;
use risingwave_common::bail;
pub use state_table_handler::*;

pub mod source_executor;

mod reader;
pub mod state_table_handler;

pub use state_table_handler::*;
use futures_async_stream::try_stream;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::executor::error::StreamExecutorError;
use crate::executor::{Barrier, Message};

/// Receive barriers from barrier manager with the channel, error on channel close.
#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn barrier_to_message_stream(mut rx: UnboundedReceiver<Barrier>) {
while let Some(barrier) = rx.recv().stack_trace("receive_barrier").await {
yield Message::Barrier(barrier);
}
bail!("barrier reader closed unexpectedly");
}
Loading