Skip to content

Commit

Permalink
Merge pull request #67 from ethereum-optimism/refcell/batch-queue
Browse files Browse the repository at this point in the history
feat(derive): BatchQueue Implementation
  • Loading branch information
refcell authored Apr 3, 2024
2 parents 23c90cc + c691622 commit 2ca2031
Show file tree
Hide file tree
Showing 15 changed files with 770 additions and 41 deletions.
366 changes: 366 additions & 0 deletions crates/derive/src/stages/batch_queue.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
traits::{ChainProvider, DataAvailabilityProvider},
types::{Batch, BlockInfo, StageError, StageResult},
};

use alloc::vec::Vec;
use anyhow::anyhow;
use core::fmt::Debug;
Expand Down
2 changes: 2 additions & 0 deletions crates/derive/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ mod channel_reader;
pub use channel_reader::ChannelReader;

mod batch_queue;
pub use batch_queue::BatchQueue;

mod engine_queue;
mod payload_derivation;
14 changes: 13 additions & 1 deletion crates/derive/src/traits/data_sources.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Contains traits that describe the functionality of various data sources used in the derivation
//! pipeline's stages.
use crate::types::{BlockInfo, Receipt, StageResult};
use crate::types::{BlockInfo, ExecutionPayloadEnvelope, L2BlockInfo, Receipt, StageResult};
use alloc::{boxed::Box, vec::Vec};
use alloy_primitives::{Address, Bytes, B256};
use anyhow::Result;
Expand All @@ -20,6 +20,18 @@ pub trait ChainProvider {
async fn receipts_by_hash(&self, hash: B256) -> Result<Vec<Receipt>>;
}

/// Describes the functionality of a data source that fetches safe blocks.
#[async_trait]
pub trait SafeBlockFetcher {
/// Returns the L2 block info given a block number.
/// Errors if the block does not exist.
async fn l2_block_info_by_number(&self, number: u64) -> Result<L2BlockInfo>;

/// Returns an execution payload for a given number.
/// Errors if the execution payload does not exist.
async fn payload_by_number(&self, number: u64) -> Result<ExecutionPayloadEnvelope>;
}

/// Describes the functionality of a data source that can provide data availability information.
#[async_trait]
pub trait DataAvailabilityProvider {
Expand Down
2 changes: 1 addition & 1 deletion crates/derive/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! pipeline.
mod data_sources;
pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter};
pub use data_sources::{ChainProvider, DataAvailabilityProvider, DataIter, SafeBlockFetcher};

mod stages;
pub use stages::ResettableStage;
Expand Down
48 changes: 48 additions & 0 deletions crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
//! [SingleBatch].
use super::DecodeError;
use crate::{
traits::SafeBlockFetcher,
types::{BlockInfo, L2BlockInfo, RollupConfig},
};
use alloc::vec::Vec;
use alloy_rlp::{Buf, Decodable, Encodable};

mod batch_type;
pub use batch_type::BatchType;

mod validity;
pub use validity::BatchValidity;

mod span_batch;
pub use span_batch::{
RawSpanBatch, SpanBatch, SpanBatchBits, SpanBatchBuilder, SpanBatchEip1559TransactionData,
Expand All @@ -19,6 +26,39 @@ pub use span_batch::{
mod single_batch;
pub use single_batch::SingleBatch;

/// A batch with its inclusion block.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchWithInclusionBlock {
/// The inclusion block
pub inclusion_block: BlockInfo,
/// The batch
pub batch: Batch,
}

impl BatchWithInclusionBlock {
/// Validates the batch can be applied on top of the specified L2 safe head.
/// The first entry of the l1_blocks should match the origin of the l2_safe_head.
/// One or more consecutive l1_blocks should be provided.
/// In case of only a single L1 block, the decision whether a batch is valid may have to stay
/// undecided.
pub fn check_batch<BF: SafeBlockFetcher>(
&self,
cfg: &RollupConfig,
l1_blocks: &[BlockInfo],
l2_safe_head: L2BlockInfo,
fetcher: &BF,
) -> BatchValidity {
match &self.batch {
Batch::Single(single_batch) => {
single_batch.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block)
}
Batch::Span(span_batch) => {
span_batch.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block, fetcher)
}
}
}
}

/// A Batch.
#[derive(Debug, Clone, PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
Expand All @@ -30,6 +70,14 @@ pub enum Batch {
}

impl Batch {
/// Returns the timestamp for the batch.
pub fn timestamp(&self) -> u64 {
match self {
Self::Single(sb) => sb.timestamp,
Self::Span(sb) => sb.timestamp(),
}
}

/// Attempts to encode a batch into a writer.
pub fn encode(&self, w: &mut Vec<u8>) -> Result<(), DecodeError> {
match self {
Expand Down
142 changes: 140 additions & 2 deletions crates/derive/src/types/batch/single_batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module contains the [SingleBatch] type.
use crate::types::RawTransaction;
use super::validity::BatchValidity;
use crate::types::{BlockInfo, L2BlockInfo, RawTransaction, RollupConfig};
use alloc::vec::Vec;
use alloy_primitives::BlockHash;
use alloy_rlp::{Decodable, Encodable};
Expand All @@ -26,6 +27,143 @@ impl SingleBatch {
pub fn has_invalid_transactions(&self) -> bool {
self.transactions.iter().any(|tx| tx.0.is_empty() || tx.0[0] == 0x7E)
}

/// Checks if the batch is valid.
pub fn check_batch(
&self,
cfg: &RollupConfig,
l1_blocks: &[BlockInfo],
l2_safe_head: L2BlockInfo,
inclusion_block: &BlockInfo,
) -> BatchValidity {
// Sanity check input consistency
if l1_blocks.is_empty() {
// TODO: log a warning: "missing L1 block input, cannot proceed with batch checking"
return BatchValidity::Undecided;
}

let epoch = l1_blocks[0];
let next_timestamp = l2_safe_head.block_info.timestamp + cfg.block_time;
if self.timestamp > next_timestamp {
// TODO: trace log: "received out-of-order batch for future processing after next batch"
return BatchValidity::Future;
}
if self.timestamp < next_timestamp {
// TODO: warn log: "dropping batch with old timestamp", "min_timestamp", next_timestamp
return BatchValidity::Drop;
}

// Dependent on the above timestamp check.
// If the timestamp is correct, then it must build on top of the safe head.
if self.parent_hash != l2_safe_head.block_info.hash {
// TODO: warn log: "ignoring batch with mismatching parent hash", "current_safe_head",
// l2_safe_head.info.hash
return BatchValidity::Drop;
}

// Filter out batches that were included too late.
if self.epoch_num + cfg.seq_window_size < inclusion_block.number {
// TODO: warn log: "batch was included too late, sequence window expired"
return BatchValidity::Drop;
}

// Check the L1 origin of the batch
let mut batch_origin = epoch;
if self.epoch_num < epoch.number {
// TODO: warn log: "dropped batch, epoch is too old", "minimum", epoch.id()
return BatchValidity::Drop;
} else if self.epoch_num == epoch.number {
// Batch is sticking to the current epoch, continue.
} else if self.epoch_num == epoch.number + 1 {
// With only 1 l1Block we cannot look at the next L1 Origin.
// Note: This means that we are unable to determine validity of a batch
// without more information. In this case we should bail out until we have
// more information otherwise the eager algorithm may diverge from a non-eager
// algorithm.
if l1_blocks.len() < 2 {
// TODO: info log: "eager batch wants to advance epoch, but could not without more
// L1 blocks", "current_epoch", epoch.id()
return BatchValidity::Undecided;
}
batch_origin = l1_blocks[1];
} else {
// TODO: warn log: "batch is for future epoch too far ahead, while it has the next
// timestamp, so it must be invalid", "current_epoch", epoch.id()
return BatchValidity::Drop;
}

// Validate the batch epoch hash
if self.epoch_hash != batch_origin.hash {
// TODO: warn log: "batch is for different L1 chain, epoch hash does not match",
// "expected", batch_origin.id()
return BatchValidity::Drop;
}

if self.timestamp < batch_origin.timestamp {
// TODO: warn log: "batch timestamp is less than L1 origin timestamp", "l2_timestamp",
// self.timestamp, "l1_timestamp", batch_origin.timestamp, "origin", batch_origin.id()
return BatchValidity::Drop;
}

// Check if we ran out of sequencer time drift
let max = if let Some(max) = batch_origin.timestamp.checked_add(cfg.max_sequencer_drift) {
max
} else {
// TODO: log that the batch exceeds time drift.
return BatchValidity::Drop;
};

let no_txs = self.transactions.is_empty();
if self.timestamp > max && !no_txs {
// If the sequencer is ignoring the time drift rule, then drop the batch and force an
// empty batch instead, as the sequencer is not allowed to include anything
// past this point without moving to the next epoch. TODO: warn log: "batch
// exceeded sequencer time drift, sequencer must adopt new L1 origin to include
// transactions again", "max_time", max
return BatchValidity::Drop;
}
if self.timestamp > max && no_txs {
// If the sequencer is co-operating by producing an empty batch,
// allow the batch if it was the right thing to do to maintain the L2 time >= L1 time
// invariant. Only check batches that do not advance the epoch, to ensure
// epoch advancement regardless of time drift is allowed.
if epoch.number == batch_origin.number {
if l1_blocks.len() < 2 {
// TODO: info log: "without the next L1 origin we cannot determine yet if this
// empty batch that exceeds the time drift is still valid"
return BatchValidity::Undecided;
}
let next_origin = l1_blocks[1];
// Check if the next L1 Origin could have been adopted
if self.timestamp >= next_origin.timestamp {
// TODO: log that the batch exceeded the time drift without adopting the next
// origin.
return BatchValidity::Drop;
} else {
// TODO: log that we are continuing with an empty batch before the late L1 block
// to preserve the L2 time invariant. TODO: metrice empty
// batch continuation
}
}
}

// We can do this check earlier, but it's a more intensive one, so we do this last.
// TODO: metrice & allow configurability to measure the time it takes to check the batch.
for tx in self.transactions.iter() {
if tx.is_empty() {
// TODO: warn log: "transaction data must not be empty, but found empty tx",
// "tx_index", i
return BatchValidity::Drop;
}
if tx.is_deposit() {
// TODO: warn log: "sequencers may not embed any deposits into batch data, but found
// tx that has one", "tx_index", i
return BatchValidity::Drop;
}
}

BatchValidity::Accept
}
}

impl Encodable for SingleBatch {
Expand All @@ -50,7 +188,7 @@ impl Decodable for SingleBatch {
}

#[cfg(test)]
mod test {
mod tests {
use super::SingleBatch;
use crate::types::RawTransaction;
use alloc::vec;
Expand Down
21 changes: 18 additions & 3 deletions crates/derive/src/types/batch/span_batch/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
#![allow(unused)]

use super::{SpanBatchError, SpanBatchTransactions};
use crate::types::{
block::L2BlockInfo, BlockInfo, RawSpanBatch, SingleBatch, SpanBatchBits, SpanBatchElement,
SpanBatchPayload, SpanBatchPrefix,
use crate::{
traits::SafeBlockFetcher,
types::{
BatchValidity, BlockInfo, L2BlockInfo, RawSpanBatch, RollupConfig, SingleBatch,
SpanBatchBits, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix,
},
};
use alloc::{vec, vec::Vec};
use alloy_primitives::FixedBytes;
Expand Down Expand Up @@ -37,6 +40,18 @@ impl SpanBatch {
self.batches[0].timestamp
}

/// Checks if the span batch is valid.
pub fn check_batch<BF: SafeBlockFetcher>(
&self,
_cfg: &RollupConfig,
_l1_blocks: &[BlockInfo],
_l2_safe_head: L2BlockInfo,
_inclusion_block: &BlockInfo,
_fetcher: &BF,
) -> BatchValidity {
unimplemented!()
}

/// Converts the span batch to a raw span batch.
pub fn to_raw_span_batch(
&self,
Expand Down
35 changes: 32 additions & 3 deletions crates/derive/src/types/batch/span_batch/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
use alloc::vec::Vec;

use crate::types::{
BatchType, RawTransaction, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix,
SpanDecodingError,
use crate::{
traits::SafeBlockFetcher,
types::{
BatchType, BatchValidity, BlockInfo, L2BlockInfo, RawTransaction, RollupConfig,
SingleBatch, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix, SpanDecodingError,
},
};

use super::{SpanBatch, SpanBatchError};
Expand All @@ -24,6 +27,32 @@ impl RawSpanBatch {
BatchType::Span
}

/// Returns the timestamp for the span batch.
pub fn timestamp(&self) -> u64 {
self.prefix.rel_timestamp
}

/// Checks if the span batch is valid.
pub fn check_batch<BF: SafeBlockFetcher>(
&self,
_cfg: &RollupConfig,
_l1_blocks: &[BlockInfo],
_l2_safe_head: L2BlockInfo,
_inclusion_block: &BlockInfo,
_fetcher: &BF,
) -> BatchValidity {
unimplemented!()
}

/// Derives [SingleBatch]s from the span batch.
pub fn get_singular_batches(
&self,
_l1_blocks: &[BlockInfo],
_parent: L2BlockInfo,
) -> Vec<SingleBatch> {
unimplemented!()
}

/// Encodes the [RawSpanBatch] into a writer.
pub fn encode(&self, w: &mut Vec<u8>) -> Result<(), SpanBatchError> {
self.prefix.encode_prefix(w);
Expand Down
25 changes: 25 additions & 0 deletions crates/derive/src/types/batch/validity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Contains the [BatchValidity] and its encodings.
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

/// Batch Validity
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BatchValidity {
/// The batch is invalid now and in the future, unless we reorg
Drop,
/// The batch is valid and should be processed
Accept,
/// We are lacking L1 information until we can proceed batch filtering
Undecided,
/// The batch may be valid, but cannot be processed yet and should be checked again later
Future,
}

impl BatchValidity {
/// Returns if the batch is dropped.
pub fn is_drop(&self) -> bool {
matches!(self, BatchValidity::Drop)
}
}
Loading

0 comments on commit 2ca2031

Please sign in to comment.