Skip to content

Commit

Permalink
feat(derive): holocene channel bank checks
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Sep 25, 2024
1 parent 830a828 commit 69d1462
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
8 changes: 8 additions & 0 deletions crates/derive/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ lazy_static! {
&["error"]
).expect("Batch Reader Errors failed to register");

/// Tracks the number of times the channel queue was detected
/// non-empty during a frame ingestion, and new channel creation
/// was attempted post-holocene.
pub static ref CHANNEL_QUEUE_NON_EMPTY: IntGauge = register_int_gauge!(
"kona_derive_channel_queue_non_empty",
"Number of times a channel was attempted to be created in the channel bank, but the queue is non-empty post-holocene."
).expect("Channel Queue Non Empty failed to register");

/// Tracks the compression ratio of batches.
pub static ref BATCH_COMPRESSION_RATIO: IntGauge = register_int_gauge!(
"kona_derive_batch_compression_ratio",
Expand Down
24 changes: 19 additions & 5 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,25 @@ where
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;

// Get the channel for the frame, or create a new one if it doesn't exist.
let current_channel = self.channels.entry(frame.id).or_insert_with(|| {
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
channel
});
let current_channel = match self.channels.get_mut(&frame.id) {
Some(c) => c,
None => {
if self.cfg.is_holocene_active(origin.timestamp) && !self.channel_queue.is_empty() {
// In holocene, channels are strictly ordered.
// If the previous frame is not the last in the channel
// and a starting frame for the next channel arrives,
// the previous channel/frames are removed and a new channel is created.
self.channel_queue.clear();

trace!(target: "channel-bank", "[holocene active] clearing non-empty channel queue");
crate::inc!(CHANNEL_QUEUE_NON_EMPTY);

Check warning on line 97 in crates/derive/src/stages/channel_bank.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/channel_bank.rs#L94-L97

Added lines #L94 - L97 were not covered by tests
}
let channel = Channel::new(frame.id, origin);
self.channel_queue.push_back(frame.id);
self.channels.insert(frame.id, channel);
self.channels.get_mut(&frame.id).expect("Channel must be in queue")
}
};

// Check if the channel is not timed out. If it has, ignore the frame.
if current_channel.open_block_number() + self.cfg.channel_timeout(origin.timestamp) <
Expand Down

0 comments on commit 69d1462

Please sign in to comment.