diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 82c699d2c..50a2fdee9 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -41,13 +41,13 @@ where P: ChannelBankProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug, { /// The rollup configuration. - cfg: Arc, + pub cfg: Arc, /// Map of channels by ID. - channels: HashMap, + pub channels: HashMap, /// Channels in FIFO order. - channel_queue: VecDeque, + pub channel_queue: VecDeque, /// The previous stage of the derivation pipeline. - prev: P, + pub prev: P, } impl

ChannelBank

@@ -323,11 +323,35 @@ mod tests { assert_eq!(trace_store.lock().iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1); } + #[test] + fn test_holocene_ingest_new_channel_unclosed() { + let frames = [ + // -- First Channel -- + Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false }, + Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: false }, + // -- Second Channel -- + Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false }, + ]; + let mock = MockChannelBankProvider::new(vec![]); + let rollup_config = RollupConfig { holocene_time: Some(0), ..Default::default() }; + let mut channel_bank = ChannelBank::new(Arc::new(rollup_config), mock); + for frame in frames.iter().take(3) { + channel_bank.ingest_frame(frame.clone()).unwrap(); + } + assert_eq!(channel_bank.channel_queue.len(), 1); + assert_eq!(channel_bank.channel_queue[0], [0xEE; 16]); + // When we ingest the next frame, channel queue will be cleared since the previous + // channel is not closed. This is invalid by Holocene rules. + channel_bank.ingest_frame(frames[3].clone()).unwrap(); + assert_eq!(channel_bank.channel_queue.len(), 1); + assert_eq!(channel_bank.channel_queue[0], [0xFF; 16]); + } + #[test] fn test_ingest_and_prune_channel_bank() { use alloc::vec::Vec; let mut frames: Vec = new_test_frames(100000); - // let data = frames.iter().map(|f| Ok(f)).collect::>>(); let mock = MockChannelBankProvider::new(vec![]); let cfg = Arc::new(RollupConfig::default()); let mut channel_bank = ChannelBank::new(cfg, mock);