Skip to content

Commit

Permalink
feat(derive): BatchOutdated validity variant
Browse files Browse the repository at this point in the history
  • Loading branch information
clabby committed Oct 10, 2024
1 parent 61f77ad commit 5225b43
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 3 deletions.
44 changes: 43 additions & 1 deletion crates/derive/src/batch/single_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ impl SingleBatch {
}
if self.timestamp < next_timestamp {
warn!("dropping batch with old timestamp, min_timestamp: {next_timestamp}");
return BatchValidity::Drop;
return if cfg.is_holocene_active(self.timestamp) {
BatchValidity::BatchOutdated
} else {
BatchValidity::Drop
};
}

// Dependent on the above timestamp check.
Expand Down Expand Up @@ -233,6 +237,44 @@ mod tests {
assert_eq!(validity, BatchValidity::Drop);
}

#[test]
fn test_drop_old_batch() {
let single_batch = SingleBatch {
parent_hash: B256::ZERO,
epoch_num: 0xFF,
epoch_hash: B256::ZERO,
timestamp: 0x00,
transactions: vec![Bytes::from(hex!("00"))],
};

let cfg = RollupConfig { block_time: 2, ..Default::default() };
let l1_blocks = vec![BlockInfo::default()];
let l2_safe_head = L2BlockInfo::default();
let inclusion_block = BlockInfo { number: 10, ..Default::default() };

let validity = single_batch.check_batch(&cfg, &l1_blocks, l2_safe_head, &inclusion_block);
assert_eq!(validity, BatchValidity::Drop);
}

#[test]
fn test_drop_old_batch_holocene() {
let single_batch = SingleBatch {
parent_hash: B256::ZERO,
epoch_num: 0xFF,
epoch_hash: B256::ZERO,
timestamp: 0x00,
transactions: vec![Bytes::from(hex!("00"))],
};

let cfg = RollupConfig { holocene_time: Some(0), block_time: 2, ..Default::default() };
let l1_blocks = vec![BlockInfo::default()];
let l2_safe_head = L2BlockInfo::default();
let inclusion_block = BlockInfo { number: 10, ..Default::default() };

let validity = single_batch.check_batch(&cfg, &l1_blocks, l2_safe_head, &inclusion_block);
assert_eq!(validity, BatchValidity::BatchOutdated);
}

#[test]
fn test_drop_prior_epoch_num() {
let single_batch = SingleBatch {
Expand Down
6 changes: 5 additions & 1 deletion crates/derive/src/batch/span_batch/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,11 @@ impl SpanBatch {
// Drop the batch if it has no new blocks after the safe head.
if self.final_timestamp() < next_timestamp {
warn!("span batch has no new blocks after safe head");
return BatchValidity::Drop;
return if cfg.is_holocene_active(self.final_timestamp()) {
BatchValidity::BatchOutdated

Check warning on line 446 in crates/derive/src/batch/span_batch/batch.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/batch/span_batch/batch.rs#L445-L446

Added lines #L445 - L446 were not covered by tests
} else {
BatchValidity::Drop

Check warning on line 448 in crates/derive/src/batch/span_batch/batch.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/batch/span_batch/batch.rs#L448

Added line #L448 was not covered by tests
}
}

BatchValidity::Accept
Expand Down
8 changes: 8 additions & 0 deletions crates/derive/src/batch/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum BatchValidity {
Undecided,
/// The batch may be valid, but cannot be processed yet and should be checked again later
Future,
/// Introduced in Holocene, a special variant of the Drop variant that signals not to flush
/// the active batch and channel.
BatchOutdated,
}

impl BatchValidity {
Expand All @@ -23,6 +26,11 @@ impl BatchValidity {
matches!(self, Self::Drop)
}

/// Returns if the batch is outdated.
pub const fn is_outdated(&self) -> bool {
matches!(self, Self::BatchOutdated)
}

/// Returns if the batch is future.
pub const fn is_future(&self) -> bool {
matches!(self, Self::Future)
Expand Down
3 changes: 3 additions & 0 deletions crates/derive/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum PipelineError {
/// [L1Retrieval]: crate::stages::L1Retrieval
#[error("L1 Retrieval missing data")]
MissingL1Data,
/// Invalid batch validity variant.
#[error("Invalid batch validity")]
InvalidBatchValidity,
/// [SystemConfig] update error.
///
/// [SystemConfig]: op_alloy_genesis::SystemConfig
Expand Down
47 changes: 47 additions & 0 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ where
self.batches = remaining;
return Err(PipelineError::Eof.temp());
}
BatchValidity::BatchOutdated => {
if !self.cfg.is_holocene_active(origin.timestamp) {
error!(target: "batch-queue", "BatchOutdated is not allowed pre-holocene");
return Err(PipelineError::InvalidBatchValidity.crit());
}

warn!(target: "batch-queue", "[HOLOCENE] Dropping outdated batch with parent: {}", parent.block_info.number);
continue;

Check warning on line 193 in crates/derive/src/stages/batch_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/batch_queue.rs#L187-L193

Added lines #L187 - L193 were not covered by tests
}
}
}
self.batches = remaining;
Expand Down Expand Up @@ -261,6 +270,9 @@ where
if drop {
self.prev.flush();
return Ok(());
} else if validity.is_outdated() {
// If the batch is outdated, we drop it without flushing the previous stage.
return Ok(());
}
self.batches.push(data);
Ok(())
Expand Down Expand Up @@ -674,6 +686,41 @@ mod tests {
assert!(bq.batches.is_empty());
}

#[tokio::test]
async fn test_add_old_batch_drop_holocene() {
// Construct a single batch with BatchValidity::BatchOutdated.
let cfg =
Arc::new(RollupConfig { holocene_time: Some(0), block_time: 2, ..Default::default() });
assert!(cfg.is_holocene_active(0));
let batch = SingleBatch {
parent_hash: B256::default(),
epoch_num: 0,
epoch_hash: B256::default(),
timestamp: 100,
transactions: Vec::new(),
};
let parent = L2BlockInfo {
block_info: BlockInfo { timestamp: 101, ..Default::default() },
..Default::default()
};

// Setup batch queue deps
let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))];
let mut mock = TestBatchQueueProvider::new(batch_vec);
mock.origin = Some(BlockInfo::default());
let fetcher = TestL2ChainProvider::default();

// Configure batch queue
let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher);
bq.origin = Some(BlockInfo::default()); // Set the origin
bq.l1_blocks.push(BlockInfo::default()); // Push the origin into the l1 blocks
bq.l1_blocks.push(BlockInfo::default()); // Push the next origin into the bq

// Add the batch to the batch queue
bq.add_batch(Batch::Single(batch), parent).await.unwrap();
assert!(bq.batches.is_empty());
}

#[tokio::test]
async fn test_derive_next_batch_missing_origin() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
Expand Down
10 changes: 9 additions & 1 deletion crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use tracing::trace;
use tracing::{error, trace};

/// Provides [Batch]es for the [BatchStream] stage.
#[async_trait]
Expand Down Expand Up @@ -153,6 +153,14 @@ where

return Err(PipelineError::Eof.temp());
}
BatchValidity::BatchOutdated => {
if !self.is_active()? {
error!(target: "batch-queue", "BatchOutdated is not allowed pre-holocene");
return Err(PipelineError::InvalidBatchValidity.crit());
}

return Err(PipelineError::Eof.temp());

Check warning on line 162 in crates/derive/src/stages/batch_stream.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/stages/batch_stream.rs#L157-L162

Added lines #L157 - L162 were not covered by tests
}
BatchValidity::Undecided | BatchValidity::Future => {
return Err(PipelineError::NotEnoughData.temp())
}
Expand Down

0 comments on commit 5225b43

Please sign in to comment.