Skip to content

Commit

Permalink
feat(derive): holocene channel bank checks
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Sep 27, 2024
1 parent 7952058 commit 082fdb2
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
8 changes: 8 additions & 0 deletions crates/derive/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ lazy_static! {
&["error"]
).expect("Batch Reader Errors failed to register");

/// Tracks the number of times the channel queue was detected
/// non-empty during a frame ingestion, and new channel creation
/// was attempted post-holocene.
pub static ref CHANNEL_QUEUE_NON_EMPTY: IntGauge = register_int_gauge!(
"kona_derive_channel_queue_non_empty",
"Number of times a channel was attempted to be created in the channel bank, but the queue is non-empty post-holocene."
).expect("Channel Queue Non Empty failed to register");

/// Tracks the compression ratio of batches.
pub static ref BATCH_COMPRESSION_RATIO: IntGauge = register_int_gauge!(
"kona_derive_batch_compression_ratio",
Expand Down
58 changes: 48 additions & 10 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ where
P: ChannelBankProvider + OriginAdvancer + OriginProvider + ResettableStage + Debug,
{
/// The rollup configuration.
cfg: Arc<RollupConfig>,
pub cfg: Arc<RollupConfig>,
/// Map of channels by ID.
channels: HashMap<ChannelId, Channel>,
pub channels: HashMap<ChannelId, Channel>,
/// Channels in FIFO order.
channel_queue: VecDeque<ChannelId>,
pub channel_queue: VecDeque<ChannelId>,
/// The previous stage of the derivation pipeline.
prev: P,
pub prev: P,
}

impl<P> ChannelBank<P>
Expand Down Expand Up @@ -83,11 +83,25 @@ where
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;

// Get the channel for the frame, or create a new one if it doesn't exist.
let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
});
let current_channel = match self.channels.get_mut(&frame.id) {
Some(c) => c,
None => {
if self.cfg.is_holocene_active(origin.timestamp) && !self.channel_queue.is_empty() {
// In holocene, channels are strictly ordered.
// If the previous frame is not the last in the channel
// and a starting frame for the next channel arrives,
// the previous channel/frames are removed and a new channel is created.
self.channel_queue.clear();

trace!(target: "channel-bank", "[holocene active] clearing non-empty channel queue");
crate::inc!(CHANNEL_QUEUE_NON_EMPTY);
}
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
self.channels.insert(frame.id, channel);
self.channels.get_mut(&frame.id).expect("Channel must be in queue")
}
};

// Check if the channel is not timed out. If it has, ignore the frame.
if current_channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) <
Expand Down Expand Up @@ -309,11 +323,35 @@ mod tests {
assert_eq!(trace_store.lock().iter().filter(|(l, _)| matches!(l, &Level::WARN)).count(), 1);
}

#[test]
fn test_holocene_ingest_new_channel_unclosed() {
let frames = [
// -- First Channel --
Frame { id: [0xEE; 16], number: 0, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xEE; 16], number: 1, data: vec![0xDD; 50], is_last: false },
Frame { id: [0xEE; 16], number: 2, data: vec![0xDD; 50], is_last: false },
// -- Second Channel --
Frame { id: [0xFF; 16], number: 0, data: vec![0xDD; 50], is_last: false },
];
let mock = MockChannelBankProvider::new(vec![]);
let rollup_config = RollupConfig { holocene_time: Some(0), ..Default::default() };
let mut channel_bank = ChannelBank::new(Arc::new(rollup_config), mock);
for frame in frames.iter().take(3) {
channel_bank.ingest_frame(frame.clone()).unwrap();
}
assert_eq!(channel_bank.channel_queue.len(), 1);
assert_eq!(channel_bank.channel_queue[0], [0xEE; 16]);
// When we ingest the next frame, channel queue will be cleared since the previous
// channel is not closed. This is invalid by Holocene rules.
channel_bank.ingest_frame(frames[3].clone()).unwrap();
assert_eq!(channel_bank.channel_queue.len(), 1);
assert_eq!(channel_bank.channel_queue[0], [0xFF; 16]);
}

#[test]
fn test_ingest_and_prune_channel_bank() {
use alloc::vec::Vec;
let mut frames: Vec<Frame> = new_test_frames(100000);
// let data = frames.iter().map(|f| Ok(f)).collect::<Vec<StageResult<Frame>>>();
let mock = MockChannelBankProvider::new(vec![]);
let cfg = Arc::new(RollupConfig::default());
let mut channel_bank = ChannelBank::new(cfg, mock);
Expand Down

0 comments on commit 082fdb2

Please sign in to comment.