Skip to content

Commit

Permalink
fix compile
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 10, 2024
1 parent e6a0095 commit d0f47d1
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 47 deletions.
49 changes: 31 additions & 18 deletions src/stream/src/executor/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,34 @@ use crate::executor::Message;
type ExecutorMessageStream = BoxStream<'static, StreamExecutorResult<Message>>;
type StreamReaderData<M> = StreamExecutorResult<Either<Message, M>>;
type ReaderArm<M> = BoxStream<'static, StreamReaderData<M>>;
type StreamReaderWithPauseInner<M> =
SelectWithStrategy<ReaderArm<M>, ReaderArm<M>, impl FnMut(&mut PollNext) -> PollNext, PollNext>;

mod stream_reader_with_pause {
use futures::stream::{select_with_strategy, PollNext, SelectWithStrategy};

use crate::executor::stream_reader::ReaderArm;

pub(super) type StreamReaderWithPauseInner<M, const BIASED: bool> = SelectWithStrategy<
ReaderArm<M>,
ReaderArm<M>,
impl FnMut(&mut PollNext) -> PollNext,
PollNext,
>;

pub(super) fn new_inner<M, const BIASED: bool>(
message_stream: ReaderArm<M>,
data_stream: ReaderArm<M>,
) -> StreamReaderWithPauseInner<M, BIASED> {
let strategy = if BIASED {
|_: &mut PollNext| PollNext::Left
} else {
// The poll strategy is not biased: we poll the two streams in a round robin way.
|last: &mut PollNext| last.toggle()
};
select_with_strategy(message_stream, data_stream, strategy)
}
}

use stream_reader_with_pause::*;

/// [`StreamReaderWithPause`] merges two streams, with one receiving barriers (and maybe other types
/// of messages) and the other receiving data only (no barrier). The merged stream can be paused
Expand All @@ -40,7 +66,7 @@ type StreamReaderWithPauseInner<M> =
/// priority over the right-hand one. Otherwise, the two streams will be polled in a round robin
/// fashion.
pub(super) struct StreamReaderWithPause<const BIASED: bool, M> {
inner: StreamReaderWithPauseInner<M>,
inner: StreamReaderWithPauseInner<M, BIASED>,
/// Whether the source stream is paused.
paused: bool,
}
Expand All @@ -54,26 +80,13 @@ impl<const BIASED: bool, M: Send + 'static> StreamReaderWithPause<BIASED, M> {
) -> Self {
let message_stream_arm = message_stream.map_ok(Either::Left).boxed();
let data_stream_arm = data_stream.map_ok(Either::Right).boxed();
let inner = Self::new_inner(message_stream_arm, data_stream_arm);
let inner = new_inner(message_stream_arm, data_stream_arm);
Self {
inner,
paused: false,
}
}

fn new_inner(
message_stream: ReaderArm<M>,
data_stream: ReaderArm<M>,
) -> StreamReaderWithPauseInner<M> {
let strategy = if BIASED {
|_: &mut PollNext| PollNext::Left
} else {
// The poll strategy is not biased: we poll the two streams in a round robin way.
|last: &mut PollNext| last.toggle()
};
select_with_strategy(message_stream, data_stream, strategy)
}

/// Replace the data stream with a new one for given `stream`. Used for split change.
pub fn replace_data_stream(
&mut self,
Expand All @@ -87,7 +100,7 @@ impl<const BIASED: bool, M: Send + 'static> StreamReaderWithPause<BIASED, M> {

// Note: create a new `SelectWithStrategy` instead of replacing the source stream arm here,
// to ensure the internal state of the `SelectWithStrategy` is reset. (#6300)
self.inner = Self::new_inner(
self.inner = new_inner(
barrier_receiver_arm,
data_stream.map_ok(Either::Right).boxed(),
);
Expand Down
90 changes: 61 additions & 29 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,61 @@ pub(super) struct BarrierState {
inner: ManagedBarrierStateInner,
}

type AwaitEpochCompletedFuture =
impl Future<Output = (Barrier, StreamResult<BarrierCompleteResult>)> + 'static;
mod await_epoch_completed_future {
use std::future::Future;

use futures::future::BoxFuture;
use futures::FutureExt;
use risingwave_hummock_sdk::SyncResult;
use risingwave_pb::stream_service::barrier_complete_response::{
CreateMviewProgress, PbCreateMviewProgress,
};

use crate::error::StreamResult;
use crate::executor::Barrier;
use crate::task::{await_tree_key, BarrierCompleteResult};

pub(super) type AwaitEpochCompletedFuture =
impl Future<Output = (Barrier, StreamResult<BarrierCompleteResult>)> + 'static;

pub(super) fn instrument_complete_barrier_future(
complete_barrier_future: Option<BoxFuture<'static, StreamResult<SyncResult>>>,
barrier: Barrier,
barrier_await_tree_reg: Option<&await_tree::Registry>,
create_mview_progress: Vec<PbCreateMviewProgress>,
) -> AwaitEpochCompletedFuture {
let prev_epoch = barrier.epoch.prev;
let future = async move {
if let Some(future) = complete_barrier_future {
let result = future.await;
result.map(Some)
} else {
Ok(None)
}
}
.map(move |result| {
(
barrier,
result.map(|sync_result| BarrierCompleteResult {
sync_result,
create_mview_progress,
}),
)
});
if let Some(reg) = barrier_await_tree_reg {
reg.register(
await_tree_key::BarrierAwait { prev_epoch },
format!("SyncEpoch({})", prev_epoch),
)
.instrument(future)
.left_future()
} else {
future.right_future()
}
}
}

use await_epoch_completed_future::*;

fn sync_epoch<S: StateStore>(
state_store: &S,
Expand Down Expand Up @@ -787,33 +840,12 @@ impl PartialGraphManagedBarrierState {
let barrier = barrier_state.barrier.clone();

self.await_epoch_completed_futures.push_back({
let future = async move {
if let Some(future) = complete_barrier_future {
let result = future.await;
result.map(Some)
} else {
Ok(None)
}
}
.map(move |result| {
(
barrier,
result.map(|sync_result| BarrierCompleteResult {
sync_result,
create_mview_progress,
}),
)
});
if let Some(reg) = &self.barrier_await_tree_reg {
reg.register(
await_tree_key::BarrierAwait { prev_epoch },
format!("SyncEpoch({})", prev_epoch),
)
.instrument(future)
.left_future()
} else {
future.right_future()
}
instrument_complete_barrier_future(
complete_barrier_future,
barrier,
self.barrier_await_tree_reg.as_ref(),
create_mview_progress,
)
});
}
}
Expand Down

0 comments on commit d0f47d1

Please sign in to comment.