Skip to content

Commit

Permalink
chore(derive): test channel reader resets (#660)
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell authored Oct 8, 2024
1 parent 08ab506 commit d7d2c10
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
11 changes: 11 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,17 @@ mod test {
data.into()
}

#[tokio::test]
async fn test_reset_channel_reader() {
let mock = MockChannelReaderProvider::new(vec![Ok(None)]);
let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default()));
reader.next_batch = Some(BatchReader::from(vec![0x00, 0x01, 0x02]));
assert!(!reader.prev.reset);
reader.reset(BlockInfo::default(), &SystemConfig::default()).await.unwrap();
assert!(reader.next_batch.is_none());
assert!(reader.prev.reset);
}

#[tokio::test]
async fn test_next_batch_batch_reader_set_fails() {
let mock = MockChannelReaderProvider::new(vec![Err(PipelineError::Eof.temp())]);
Expand Down
5 changes: 4 additions & 1 deletion crates/derive/src/stages/test_utils/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ pub struct MockChannelReaderProvider {
pub data: Vec<PipelineResult<Option<Bytes>>>,
/// The origin block info
pub block_info: Option<BlockInfo>,
/// Tracks if the channel reader provider has been reset.
pub reset: bool,
}

impl MockChannelReaderProvider {
/// Creates a new [MockChannelReaderProvider] with the given data.
pub fn new(data: Vec<PipelineResult<Option<Bytes>>>) -> Self {
Self { data, block_info: Some(BlockInfo::default()) }
Self { data, block_info: Some(BlockInfo::default()), reset: false }
}
}

Expand All @@ -50,6 +52,7 @@ impl ChannelReaderProvider for MockChannelReaderProvider {
#[async_trait]
impl ResettableStage for MockChannelReaderProvider {
async fn reset(&mut self, _base: BlockInfo, _cfg: &SystemConfig) -> PipelineResult<()> {
self.reset = true;
Ok(())
}
}

0 comments on commit d7d2c10

Please sign in to comment.