Skip to content

Commit

Permalink
feat(stream): one small step for watermark (#6063)
Browse files Browse the repository at this point in the history
* reafactor(proto): add datum in proto

* commit remove result

* refactor(error): refine the stream error decoding protobuf

* introduce watermark

* add todo

* add comments

* fix

* fix

* fix comments

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
st1page and mergify[bot] authored Oct 27, 2022
1 parent be8d0e7 commit bd8fb46
Show file tree
Hide file tree
Showing 22 changed files with 170 additions and 3 deletions.
7 changes: 7 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ message Barrier {
repeated uint32 passed_actors = 255;
}

message Watermark {
// the watermark column's index in the stream's schema
uint32 col_idx = 1;
// the watermark value, there will be no record having a greater value in the watermark column
data.Datum val = 2;
}

message StreamMessage {
oneof stream_message {
data.StreamChunk stream_chunk = 1;
Expand Down
6 changes: 6 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ async fn test_table_materialize() -> StreamResult<()> {
let message = materialize.next().await.unwrap()?;
let mut col_row_ids = vec![];
match message {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(c) => {
let col_row_id = c.columns()[0].array_ref().as_int64();
col_row_ids.push(col_row_id.value_at(0).unwrap());
Expand Down Expand Up @@ -310,6 +313,9 @@ async fn test_table_materialize() -> StreamResult<()> {
// Poll `Materialize`, should output the same deletion stream chunk
let message = materialize.next().await.unwrap()?;
match message {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(c) => {
let col_row_id = c.columns()[0].array_ref().as_int64();
assert_eq!(col_row_id.value_at(0).unwrap(), col_row_ids[0]);
Expand Down
18 changes: 18 additions & 0 deletions src/stream/src/executor/barrier_align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub async fn barrier_align(
// left stream end, passthrough right chunks
while let Some(msg) = right.next().await {
match msg? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
Message::Barrier(_) => {
bail!("right barrier received while left stream end");
Expand All @@ -72,6 +75,9 @@ pub async fn barrier_align(
// right stream end, passthrough left chunks
while let Some(msg) = left.next().await {
match msg? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
Message::Barrier(_) => {
bail!("left barrier received while right stream end");
Expand All @@ -81,6 +87,9 @@ pub async fn barrier_align(
break;
}
Either::Left((Some(msg), _)) => match msg? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
Message::Barrier(_) => loop {
let start_time = Instant::now();
Expand All @@ -90,6 +99,9 @@ pub async fn barrier_align(
.await
.context("failed to poll right message, stream closed unexpectedly")??
{
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
Message::Barrier(barrier) => {
yield AlignedMessage::Barrier(barrier);
Expand All @@ -103,6 +115,9 @@ pub async fn barrier_align(
},
},
Either::Right((Some(msg), _)) => match msg? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => yield AlignedMessage::Right(chunk),
Message::Barrier(_) => loop {
let start_time = Instant::now();
Expand All @@ -112,6 +127,9 @@ pub async fn barrier_align(
.await
.context("failed to poll left message, stream closed unexpectedly")??
{
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => yield AlignedMessage::Left(chunk),
Message::Barrier(barrier) => {
yield AlignedMessage::Barrier(barrier);
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ impl ChainExecutor {
#[for_await]
for msg in upstream {
match msg? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => {
yield Message::Chunk(mapping(&self.upstream_indices, chunk));
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ impl DispatchExecutorInner {

async fn dispatch(&mut self, msg: Message) -> StreamResult<()> {
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Chunk(chunk) => {
self.metrics
.actor_out_record_cnt
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
for msg in input {
let msg = msg?;
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Chunk(chunk) => {
Self::apply_chunk(
&ctx,
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,10 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
for msg in input {
let msg = msg?;
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Chunk(chunk) => {
Self::apply_chunk(&mut extra, &mut agg_states, chunk).await?;
}
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
self.chunk_size,
) {
yield chunk.map(|v| match v {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => Message::Chunk(chunk),
barrier @ Message::Barrier(_) => barrier,
})?;
Expand All @@ -678,6 +681,9 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
self.chunk_size,
) {
yield chunk.map(|v| match v {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => Message::Chunk(chunk),
barrier @ Message::Barrier(_) => barrier,
})?;
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ impl StreamConsumer for MockConsumer {
async move {
while let Some(item) = input.next().await {
match item? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => data.lock().unwrap().push(chunk),
Message::Barrier(barrier) => yield barrier,
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/local_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ impl LocalSimpleAggExecutor {
for msg in input {
let msg = msg?;
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Chunk(chunk) => {
Self::apply_chunk(&ctx, &info.identity, &agg_calls, &mut aggregators, chunk)?;
is_dirty = true;
Expand Down
33 changes: 33 additions & 0 deletions src/stream/src/executor/lookup/sides.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub async fn poll_until_barrier(stream: impl MessageStream, expected_barrier: Ba
#[for_await]
for item in stream {
match item? {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
c @ Message::Chunk(_) => yield c,
Message::Barrier(b) => {
if b.epoch != expected_barrier.epoch {
Expand Down Expand Up @@ -149,6 +152,12 @@ pub async fn align_barrier(left: impl MessageStream, right: impl MessageStream)
yield Either::Right(Message::Barrier(b.clone()));
break 'inner (SideStatus::RightBarrier, b);
}
Some(Either::Right(Ok(Message::Watermark(_)))) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Some(Either::Left(Ok(Message::Watermark(_)))) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Some(Either::Left(Err(e))) | Some(Either::Right(Err(e))) => return Err(e),
None => {
break 'outer;
Expand Down Expand Up @@ -225,6 +234,12 @@ pub async fn stream_lookup_arrange_prev_epoch(
break;
}
}
Either::Left(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Either::Right(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
}
}

Expand All @@ -234,6 +249,9 @@ pub async fn stream_lookup_arrange_prev_epoch(
.await
.expect("unexpected close of barrier aligner")?
{
Either::Left(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
Either::Left(Message::Barrier(b)) => {
yield ArrangeMessage::Barrier(b);
Expand Down Expand Up @@ -300,6 +318,12 @@ pub async fn stream_lookup_arrange_this_epoch(
}
break 'inner Status::ArrangeReady;
}
Either::Left(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Either::Right(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
}
};
match status {
Expand All @@ -316,6 +340,12 @@ pub async fn stream_lookup_arrange_this_epoch(
yield ArrangeMessage::Barrier(b);
break;
}
Either::Left(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Either::Right(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Either::Right(_) => unreachable!(),
}
},
Expand All @@ -342,6 +372,9 @@ pub async fn stream_lookup_arrange_this_epoch(
yield ArrangeMessage::Barrier(stream_barrier);
break;
}
Either::Right(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
}
},
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/lookup_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl LookupUnionExecutor {
};
end = false;
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

msg @ Message::Chunk(_) => yield msg,
Message::Barrier(barrier) => {
if let Some(this_barrier) = &this_barrier {
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ impl MergeExecutor {
let mut msg: Message = msg?;

match &mut msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => {
self.metrics
.actor_in_record_cnt
Expand Down Expand Up @@ -273,6 +276,9 @@ impl Stream for SelectReceivers {
self.last_base = (idx + 1) % self.upstreams.len();
return Poll::Ready(Some(Ok(message)));
}
Some(Ok(Message::Watermark(_))) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
},
}
}
Expand Down
37 changes: 34 additions & 3 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ use risingwave_common::array::column::Column;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, Datum};
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::value_encoding::{deserialize_datum, serialize_datum_to_bytes};
use risingwave_connector::source::SplitImpl;
use risingwave_pb::data::Epoch as ProstEpoch;
use risingwave_pb::data::{Datum as ProstDatum, Epoch as ProstEpoch};
use risingwave_pb::stream_plan::add_mutation::Dispatchers;
use risingwave_pb::stream_plan::barrier::Mutation as ProstMutation;
use risingwave_pb::stream_plan::stream_message::StreamMessage;
use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
use risingwave_pb::stream_plan::{
AddMutation, Barrier as ProstBarrier, Dispatcher as ProstDispatcher, PauseMutation,
ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamMessage as ProstStreamMessage,
UpdateMutation,
UpdateMutation, Watermark as ProstWatermark,
};
use smallvec::SmallVec;

Expand Down Expand Up @@ -515,10 +516,38 @@ impl Barrier {
}
}

#[derive(Debug, PartialEq, Clone)]
pub struct Watermark {
col_idx: usize,
val: Datum,
}

impl Watermark {
pub fn to_protobuf(&self) -> ProstWatermark {
ProstWatermark {
col_idx: self.col_idx as _,
val: Some(ProstDatum {
body: serialize_datum_to_bytes(self.val.as_ref()),
}),
}
}

pub fn from_protobuf(
prost: &ProstWatermark,
data_type: &DataType,
) -> StreamExecutorResult<Self> {
Ok(Watermark {
col_idx: prost.col_idx as _,
val: deserialize_datum(&*prost.get_val()?.body, data_type)?,
})
}
}

#[derive(Debug, EnumAsInner, PartialEq)]
pub enum Message {
Chunk(StreamChunk),
Barrier(Barrier),
Watermark(Watermark),
}

impl<'a> TryFrom<&'a Message> for &'a Barrier {
Expand All @@ -528,6 +557,7 @@ impl<'a> TryFrom<&'a Message> for &'a Barrier {
match m {
Message::Chunk(_) => Err(()),
Message::Barrier(b) => Ok(b),
Message::Watermark(_) => Err(()),
}
}
}
Expand Down Expand Up @@ -555,6 +585,7 @@ impl Message {
StreamMessage::StreamChunk(prost_stream_chunk)
}
Self::Barrier(barrier) => StreamMessage::Barrier(barrier.clone().to_protobuf()),
Self::Watermark(_) => todo!("https://github.com/risingwavelabs/risingwave/issues/6042"),
};
ProstStreamMessage {
stream_message: Some(prost),
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ impl<S: StateStore> MaterializeExecutor<S> {
for msg in input {
let msg = msg?;
yield match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}
Message::Chunk(chunk) => {
self.state_table.write_chunk(chunk.clone());
Message::Chunk(chunk)
Expand Down
4 changes: 4 additions & 0 deletions src/stream/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ impl ProjectSetExecutor {
for msg in input {
let msg = msg?;
match msg {
Message::Watermark(_) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
}

Message::Chunk(chunk) => {
let chunk = chunk.compact();

Expand Down
Loading

0 comments on commit bd8fb46

Please sign in to comment.