Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(derive): Pipeline Parameters & Max Channel Bank Size #587

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub mod attributes;
pub mod batch;
pub mod block;
pub mod errors;
pub mod params;
pub mod pipeline;
pub mod sources;
pub mod stages;
Expand Down
16 changes: 0 additions & 16 deletions crates/derive/src/params.rs

This file was deleted.

43 changes: 40 additions & 3 deletions crates/derive/src/stages/channel_bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use crate::{
errors::{PipelineError, PipelineErrorKind, PipelineResult},
params::MAX_CHANNEL_BANK_SIZE,
stages::ChannelReaderProvider,
traits::{OriginAdvancer, OriginProvider, ResettableStage},
};
Expand All @@ -15,6 +14,12 @@ use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, Channel, ChannelId, Frame};
use tracing::{trace, warn};

/// The maximum size of a channel bank.
pub(crate) const MAX_CHANNEL_BANK_SIZE: usize = 100_000_000;

/// The maximum size of a channel bank after the Fjord Hardfork.
pub(crate) const FJORD_MAX_CHANNEL_BANK_SIZE: usize = 1_000_000_000;

/// Provides frames for the [ChannelBank] stage.
#[async_trait]
pub trait ChannelBankProvider {
Expand Down Expand Up @@ -65,11 +70,17 @@ where
self.channels.iter().fold(0, |acc, (_, c)| acc + c.size())
}

/// Prunes the Channel bank, until it is below [MAX_CHANNEL_BANK_SIZE].
/// Prunes the Channel bank, until it is below the max channel bank size.
/// Prunes from the high-priority channel since it failed to be read.
pub fn prune(&mut self) -> PipelineResult<()> {
let mut total_size = self.size();
while total_size > MAX_CHANNEL_BANK_SIZE {
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
let max_channel_bank_size = if self.cfg.is_fjord_active(origin.timestamp) {
FJORD_MAX_CHANNEL_BANK_SIZE
} else {
MAX_CHANNEL_BANK_SIZE
};
while total_size > max_channel_bank_size {
let id =
self.channel_queue.pop_front().ok_or(PipelineError::ChannelBankEmpty.crit())?;
let channel = self.channels.remove(&id).ok_or(PipelineError::ChannelNotFound.crit())?;
Expand Down Expand Up @@ -374,6 +385,32 @@ mod tests {
assert_eq!(channel_bank.size(), current_size);
}

#[test]
fn test_ingest_and_prune_channel_bank_fjord() {
use alloc::vec::Vec;
let mut frames: Vec<Frame> = new_test_frames(100000);
let mock = MockChannelBankProvider::new(vec![]);
let cfg = Arc::new(RollupConfig { fjord_time: Some(0), ..Default::default() });
let mut channel_bank = ChannelBank::new(cfg, mock);
// Ingest frames until the channel bank is full and it stops increasing in size
let mut current_size = 0;
let next_frame = frames.pop().unwrap();
channel_bank.ingest_frame(next_frame).unwrap();
while channel_bank.size() > current_size {
current_size = channel_bank.size();
let next_frame = frames.pop().unwrap();
channel_bank.ingest_frame(next_frame).unwrap();
assert!(channel_bank.size() <= FJORD_MAX_CHANNEL_BANK_SIZE);
}
// There should be a bunch of frames leftover
assert!(!frames.is_empty());
// If we ingest one more frame, the channel bank should prune
// and the size should be the same
let next_frame = frames.pop().unwrap();
channel_bank.ingest_frame(next_frame).unwrap();
assert_eq!(channel_bank.size(), current_size);
}

#[tokio::test]
async fn test_read_empty_channel_bank() {
let frames = new_test_frames(1);
Expand Down
11 changes: 3 additions & 8 deletions crates/derive/src/stages/l1_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,27 +146,22 @@ impl<F: ChainProvider + Send> ResettableStage for L1Traversal<F> {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{
errors::PipelineErrorKind,
params::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC},
traits::test_utils::TestChainProvider,
};
use crate::{errors::PipelineErrorKind, traits::test_utils::TestChainProvider};
use alloc::vec;
use alloy_consensus::Receipt;
use alloy_primitives::{address, b256, hex, Bytes, Log, LogData, B256};
use op_alloy_genesis::system::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC};
refcell marked this conversation as resolved.
Show resolved Hide resolved

const L1_SYS_CONFIG_ADDR: Address = address!("1337000000000000000000000000000000000000");

fn new_update_batcher_log() -> Log {
const UPDATE_TYPE: B256 =
b256!("0000000000000000000000000000000000000000000000000000000000000000");
Log {
address: L1_SYS_CONFIG_ADDR,
data: LogData::new_unchecked(
vec![
CONFIG_UPDATE_TOPIC,
CONFIG_UPDATE_EVENT_VERSION_0,
UPDATE_TYPE,
B256::ZERO, // Update type
],
hex!("00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000beef").into()
)
Expand Down