From 5458ec1e56fd87d220de6c6321ef6f5c3c05ed6f Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 8 Apr 2024 22:32:42 -0400 Subject: [PATCH] fix(derive): Derive full `SpanBatch` in channel reader --- crates/derive/src/stages/batch_queue.rs | 21 ++++++++----- crates/derive/src/stages/channel_reader.rs | 26 ++++++++-------- crates/derive/src/types/batch/mod.rs | 24 +++++---------- .../src/types/batch/span_batch/batch.rs | 4 +-- .../derive/src/types/batch/span_batch/raw.rs | 30 ++----------------- 5 files changed, 40 insertions(+), 65 deletions(-) diff --git a/crates/derive/src/stages/batch_queue.rs b/crates/derive/src/stages/batch_queue.rs index c081ddcf2..47cbb9c0f 100644 --- a/crates/derive/src/stages/batch_queue.rs +++ b/crates/derive/src/stages/batch_queue.rs @@ -8,7 +8,7 @@ use crate::{ SingleBatch, StageError, StageResult, SystemConfig, }, }; -use alloc::{boxed::Box, vec::Vec}; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; @@ -45,7 +45,7 @@ where BF: L2ChainProvider + Debug, { /// The rollup config. - cfg: RollupConfig, + cfg: Arc, /// The previous stage of the derivation pipeline. prev: P, /// The l1 block ref @@ -75,7 +75,7 @@ where BF: L2ChainProvider + Debug, { /// Creates a new [BatchQueue] stage. - pub fn new(cfg: RollupConfig, prev: P, fetcher: BF) -> Self { + pub fn new(cfg: Arc, prev: P, fetcher: BF) -> Self { Self { cfg, prev, @@ -343,7 +343,11 @@ where match batch { Batch::Single(sb) => Ok(sb), Batch::Span(sb) => { - let batches = sb.get_singular_batches(&self.l1_blocks, parent); + let batches = sb.get_singular_batches(&self.l1_blocks, parent).map_err(|e| { + StageError::Custom(anyhow!( + "Could not get singular batches from span batch: {e}" + )) + })?; self.next_spans = batches; let nb = self .pop_next_batch(parent) @@ -413,7 +417,7 @@ mod tests { #[test] fn test_derive_next_batch_missing_origin() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; - let cfg = RollupConfig::default(); + let cfg = Arc::new(RollupConfig::default()); let mock = MockBatchQueueProvider::new(data); let fetcher = MockBlockFetcher::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -425,10 +429,11 @@ mod tests { #[tokio::test] async fn test_next_batch_not_enough_data() { let mut reader = new_batch_reader(); - let batch = reader.next_batch().unwrap(); + let cfg = Arc::new(RollupConfig::default()); + let batch = reader.next_batch(cfg.as_ref()).unwrap(); let mock = MockBatchQueueProvider::new(vec![Ok(batch)]); let fetcher = MockBlockFetcher::default(); - let mut bq = BatchQueue::new(RollupConfig::default(), mock, fetcher); + let mut bq = BatchQueue::new(cfg, mock, fetcher); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); assert_eq!(res, StageError::NotEnoughData); assert!(bq.is_last_in_span()); @@ -456,7 +461,7 @@ mod tests { #[tokio::test] async fn test_batch_queue_empty_bytes() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; - let cfg = RollupConfig::default(); + let cfg = Arc::new(RollupConfig::default()); let mock = MockBatchQueueProvider::new(data); let fetcher = MockBlockFetcher::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 8a8b23a3f..e6cf77611 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -3,10 +3,10 @@ use crate::{ stages::BatchQueueProvider, traits::OriginProvider, - types::{Batch, BlockInfo, StageError, StageResult}, + types::{Batch, BlockInfo, RollupConfig, StageError, StageResult}, }; -use alloc::{boxed::Box, vec::Vec}; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; use alloy_primitives::Bytes; use async_trait::async_trait; use core::fmt::Debug; @@ -33,6 +33,8 @@ where prev: P, /// The batch reader. next_batch: Option, + /// The rollup coonfiguration. + cfg: Arc, } impl

ChannelReader

@@ -40,8 +42,8 @@ where P: ChannelReaderProvider + OriginProvider + Debug, { /// Create a new [ChannelReader] stage. - pub fn new(prev: P) -> Self { - Self { prev, next_batch: None } + pub fn new(prev: P, cfg: Arc) -> Self { + Self { prev, next_batch: None, cfg: cfg.clone() } } /// Creates the batch reader from available channel data. @@ -75,7 +77,7 @@ where .next_batch .as_mut() .expect("Cannot be None") - .next_batch() + .next_batch(self.cfg.as_ref()) .ok_or(StageError::NotEnoughData) { Ok(batch) => Ok(batch), @@ -112,7 +114,7 @@ pub(crate) struct BatchReader { impl BatchReader { /// Pulls out the next batch from the reader. - pub(crate) fn next_batch(&mut self) -> Option { + pub(crate) fn next_batch(&mut self, cfg: &RollupConfig) -> Option { // If the data is not already decompressed, decompress it. if let Some(data) = self.data.take() { let decompressed_data = decompress_to_vec_zlib(&data).ok()?; @@ -121,7 +123,7 @@ impl BatchReader { // Decompress and RLP decode the batch data, before finally decoding the batch itself. let mut decompressed_reader = self.decompressed.as_slice(); - let batch = Batch::decode(&mut decompressed_reader).ok()?; + let batch = Batch::decode(&mut decompressed_reader, cfg).ok()?; // Advance the cursor on the reader. self.cursor += self.decompressed.len() - decompressed_reader.len(); @@ -153,7 +155,7 @@ mod test { #[tokio::test] async fn test_next_batch_batch_reader_set_fails() { let mock = MockChannelReaderProvider::new(vec![Err(StageError::Eof)]); - let mut reader = ChannelReader::new(mock); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); assert_eq!(reader.next_batch().await, Err(StageError::Eof)); assert!(reader.next_batch.is_none()); } @@ -161,7 +163,7 @@ mod test { #[tokio::test] async fn test_next_batch_batch_reader_no_data() { let mock = MockChannelReaderProvider::new(vec![Ok(None)]); - let mut reader = ChannelReader::new(mock); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); assert_eq!(reader.next_batch().await, Err(StageError::NoChannel)); assert!(reader.next_batch.is_none()); } @@ -169,7 +171,7 @@ mod test { #[tokio::test] async fn test_next_batch_not_enough_data() { let mock = MockChannelReaderProvider::new(vec![Ok(Some(Bytes::default()))]); - let mut reader = ChannelReader::new(mock); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); assert_eq!(reader.next_batch().await, Err(StageError::NotEnoughData)); assert!(reader.next_batch.is_none()); } @@ -178,7 +180,7 @@ mod test { async fn test_next_batch_succeeds() { let raw = new_compressed_batch_data(); let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]); - let mut reader = ChannelReader::new(mock); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); let res = reader.next_batch().await.unwrap(); matches!(res, Batch::Span(_)); assert!(reader.next_batch.is_some()); @@ -192,7 +194,7 @@ mod test { let compressed_raw_data = compress_to_vec_zlib(typed_data.as_slice(), 5); let mut reader = BatchReader::from(compressed_raw_data); - reader.next_batch().unwrap(); + reader.next_batch(&RollupConfig::default()).unwrap(); assert_eq!(reader.cursor, typed_data.len()); } diff --git a/crates/derive/src/types/batch/mod.rs b/crates/derive/src/types/batch/mod.rs index 33ed0b4b2..e01be15b7 100644 --- a/crates/derive/src/types/batch/mod.rs +++ b/crates/derive/src/types/batch/mod.rs @@ -6,8 +6,7 @@ use crate::{ traits::L2ChainProvider, types::{BlockInfo, L2BlockInfo, RollupConfig}, }; -use alloc::vec::Vec; -use alloy_rlp::{Buf, Decodable, Encodable}; +use alloy_rlp::{Buf, Decodable}; mod batch_type; pub use batch_type::BatchType; @@ -66,7 +65,7 @@ pub enum Batch { /// A single batch Single(SingleBatch), /// Span Batches - Span(RawSpanBatch), + Span(SpanBatch), } impl Batch { @@ -78,19 +77,8 @@ impl Batch { } } - /// Attempts to encode a batch into a writer. - pub fn encode(&self, w: &mut Vec) -> Result<(), DecodeError> { - match self { - Self::Single(single_batch) => { - single_batch.encode(w); - Ok(()) - } - Self::Span(span_batch) => span_batch.encode(w).map_err(DecodeError::SpanBatchError), - } - } - /// Attempts to decode a batch from a reader. - pub fn decode(r: &mut &[u8]) -> Result { + pub fn decode(r: &mut &[u8], cfg: &RollupConfig) -> Result { if r.is_empty() { return Err(DecodeError::EmptyBuffer); } @@ -105,7 +93,11 @@ impl Batch { Ok(Batch::Single(single_batch)) } BatchType::Span => { - let span_batch = RawSpanBatch::decode(r).map_err(DecodeError::SpanBatchError)?; + let mut raw_span_batch = + RawSpanBatch::decode(r).map_err(DecodeError::SpanBatchError)?; + let span_batch = raw_span_batch + .derive(cfg.block_time, cfg.genesis.timestamp, cfg.l2_chain_id) + .map_err(DecodeError::SpanBatchError)?; Ok(Batch::Span(span_batch)) } } diff --git a/crates/derive/src/types/batch/span_batch/batch.rs b/crates/derive/src/types/batch/span_batch/batch.rs index 0f9d29ba3..050ba2892 100644 --- a/crates/derive/src/types/batch/span_batch/batch.rs +++ b/crates/derive/src/types/batch/span_batch/batch.rs @@ -36,7 +36,7 @@ pub struct SpanBatch { impl SpanBatch { /// Returns the timestamp for the first batch in the span. - pub fn get_timestamp(&self) -> u64 { + pub fn timestamp(&self) -> u64 { self.batches[0].timestamp } @@ -87,7 +87,7 @@ impl SpanBatch { /// stage. pub fn get_singular_batches( &self, - l1_origins: Vec, + l1_origins: &[BlockInfo], l2_safe_head: L2BlockInfo, ) -> Result, SpanBatchError> { let mut single_batches = Vec::new(); diff --git a/crates/derive/src/types/batch/span_batch/raw.rs b/crates/derive/src/types/batch/span_batch/raw.rs index 1cb55739c..e1ca81d7f 100644 --- a/crates/derive/src/types/batch/span_batch/raw.rs +++ b/crates/derive/src/types/batch/span_batch/raw.rs @@ -2,12 +2,9 @@ use alloc::vec::Vec; -use crate::{ - traits::L2ChainProvider, - types::{ - BatchType, BatchValidity, BlockInfo, L2BlockInfo, RawTransaction, RollupConfig, - SingleBatch, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix, SpanDecodingError, - }, +use crate::types::{ + BatchType, RawTransaction, SpanBatchElement, SpanBatchPayload, SpanBatchPrefix, + SpanDecodingError, }; use super::{SpanBatch, SpanBatchError}; @@ -32,27 +29,6 @@ impl RawSpanBatch { self.prefix.rel_timestamp } - /// Checks if the span batch is valid. - pub fn check_batch( - &self, - _cfg: &RollupConfig, - _l1_blocks: &[BlockInfo], - _l2_safe_head: L2BlockInfo, - _inclusion_block: &BlockInfo, - _fetcher: &BF, - ) -> BatchValidity { - unimplemented!() - } - - /// Derives [SingleBatch]s from the span batch. - pub fn get_singular_batches( - &self, - _l1_blocks: &[BlockInfo], - _parent: L2BlockInfo, - ) -> Vec { - unimplemented!() - } - /// Encodes the [RawSpanBatch] into a writer. pub fn encode(&self, w: &mut Vec) -> Result<(), SpanBatchError> { self.prefix.encode_prefix(w);