Skip to content

Commit

Permalink
chore(streaming): check epoch continuous (risingwavelabs#3839)
Browse files Browse the repository at this point in the history
* chore(streaming): check epoch continuous

Signed-off-by: Alex Chi <iskyzh@gmail.com>

* fix unit test

Signed-off-by: Alex Chi <iskyzh@gmail.com>

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
skyzh and mergify[bot] authored Jul 13, 2022
1 parent 4fadf48 commit 6e45426
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/stream/src/executor/debug/epoch_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ pub async fn epoch_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
b
);
}

if let Some(last_epoch) = last_epoch {
assert!(b.epoch.prev == last_epoch, "missing barrier: last barrier's epoch = {}, while current barrier prev={} curr={}", last_epoch, b.epoch.prev, b.epoch.curr);
}

last_epoch = Some(new_epoch);
} else if last_epoch.is_none() && !info.identity.contains("BatchQuery") {
panic!(
Expand All @@ -70,20 +75,20 @@ mod tests {
#[tokio::test]
async fn test_epoch_ok() {
let (mut tx, source) = MockSource::channel(Default::default(), vec![]);
tx.push_barrier(100, false);
tx.push_barrier(1, false);
tx.push_chunk(StreamChunk::default());
tx.push_barrier(114, false);
tx.push_barrier(114, false);
tx.push_barrier(514, false);
tx.push_barrier(2, false);
tx.push_barrier(3, false);
tx.push_barrier(4, false);

let checked = epoch_check(source.info().into(), source.boxed().execute());
pin_mut!(checked);

assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 100);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 1);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Chunk(_));
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 114);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 114);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 514);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 2);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 3);
assert_matches!(checked.next().await.unwrap().unwrap(), Message::Barrier(b) if b.epoch.curr == 4);
}

#[should_panic]
Expand Down

0 comments on commit 6e45426

Please sign in to comment.