From 4e07ad2940fa296dbec7db52aff37b147a0fdc9c Mon Sep 17 00:00:00 2001 From: clabby Date: Tue, 22 Oct 2024 09:40:21 -0400 Subject: [PATCH] feat(derive): `BatchProvider` multiplexed stage (#726) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(derive): `BatchProvider` multiplexed stage * tests * monolithic mux macro -> viking funeral ⛵🔥 * rebase * updates * remove `into_prev` --- bin/client/src/l1/driver.rs | 4 +- crates/derive-alloy/src/pipeline.rs | 4 +- crates/derive/src/errors.rs | 8 +- crates/derive/src/pipeline/builder.rs | 12 +- .../derive/src/stages/batch/batch_provider.rs | 300 ++++++++++++++++++ crates/derive/src/stages/batch/batch_queue.rs | 56 ++-- .../src/stages/batch/batch_validator.rs | 36 +-- crates/derive/src/stages/batch/mod.rs | 9 +- .../src/stages/channel/channel_assembler.rs | 5 - .../derive/src/stages/channel/channel_bank.rs | 17 - .../src/stages/channel/channel_provider.rs | 241 ++++++++++---- crates/derive/src/stages/mod.rs | 8 +- crates/derive/src/stages/multiplexed.rs | 254 --------------- .../{batch_queue.rs => batch_provider.rs} | 12 +- .../{channel_bank.rs => channel_provider.rs} | 0 crates/derive/src/test_utils/mod.rs | 14 +- crates/derive/src/test_utils/pipeline.rs | 13 +- 17 files changed, 572 insertions(+), 421 deletions(-) create mode 100644 crates/derive/src/stages/batch/batch_provider.rs delete mode 100644 crates/derive/src/stages/multiplexed.rs rename crates/derive/src/test_utils/{batch_queue.rs => batch_provider.rs} (87%) rename crates/derive/src/test_utils/{channel_bank.rs => channel_provider.rs} (100%) diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index e7804dbcf..73dffd3d8 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -16,7 +16,7 @@ use kona_derive::{ pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, + AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, traits::{ @@ -49,7 +49,7 @@ pub type OracleAttributesBuilder = /// An oracle-backed attributes queue for the derivation pipeline. pub type OracleAttributesQueue = AttributesQueue< - BatchQueue< + BatchProvider< BatchStream< ChannelReader< ChannelProvider< diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index 664c55756..d284b0a83 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -5,7 +5,7 @@ use kona_derive::{ pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, + AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, }; @@ -34,7 +34,7 @@ pub type OnlineAttributesBuilder = /// An `online` attributes queue for the derivation pipeline. pub type OnlineAttributesQueue = AttributesQueue< - BatchQueue< + BatchProvider< BatchStream< ChannelReader< ChannelProvider>>>, diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index 006285f3a..48219025e 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -1,9 +1,6 @@ //! This module contains derivation errors thrown within the pipeline. -use crate::{ - batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS}, - stages::MultiplexerError, -}; +use crate::batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS}; use alloc::string::String; use alloy_eips::BlockNumHash; use alloy_primitives::B256; @@ -116,9 +113,6 @@ pub enum PipelineError { /// [PipelineEncodingError] variant. #[error("Decode error: {0}")] BadEncoding(#[from] PipelineEncodingError), - /// A multiplexer stage error. - #[error("Multiplexer error: {0}")] - Multiplexer(#[from] MultiplexerError), /// Provider error variant. #[error("Blob provider error: {0}")] Provider(String), diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 75c75e1a1..8dc995412 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -3,7 +3,7 @@ use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline}; use crate::{ stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, + AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, traits::{ChainProvider, L2ChainProvider}, @@ -19,8 +19,8 @@ type FrameQueueStage = FrameQueue>; type ChannelProviderStage = ChannelProvider>; type ChannelReaderStage = ChannelReader>; type BatchStreamStage = BatchStream, T>; -type BatchQueueStage = BatchQueue, T>; -type AttributesQueueStage = AttributesQueue, B>; +type BatchProviderStage = BatchProvider, T>; +type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. #[derive(Debug)] @@ -137,10 +137,10 @@ where let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config)); let batch_stream = BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); - let batch_queue = - BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); + let batch_provider = + BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); let attributes = - AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder); + AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); // Create the pipeline. Self::new(attributes, rollup_config, l2_chain_provider) diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs new file mode 100644 index 000000000..b480a9e96 --- /dev/null +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -0,0 +1,300 @@ +//! This module contains the [BatchProvider] stage. + +use super::NextBatchProvider; +use crate::{ + batch::SingleBatch, + errors::{PipelineError, PipelineResult}, + stages::{BatchQueue, BatchValidator}, + traits::{ + AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver, + }, +}; +use alloc::{boxed::Box, sync::Arc}; +use async_trait::async_trait; +use core::fmt::Debug; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; + +/// The [BatchProvider] stage is a mux between the [BatchQueue] and [BatchValidator] stages. +/// +/// Rules: +/// When Holocene is not active, the [BatchQueue] is used. +/// When Holocene is active, the [BatchValidator] is used. +/// +/// When transitioning between the two stages, the mux will reset the active stage, but +/// retain `l1_blocks`. +#[derive(Debug)] +pub struct BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + F: L2ChainProvider + Clone + Debug, +{ + /// The rollup configuration. + cfg: Arc, + /// The L2 chain provider. + provider: F, + /// The previous stage of the derivation pipeline. + /// + /// If this is set to [None], the multiplexer has been activated and the active stage + /// owns the previous stage. + /// + /// Must be [None] if `batch_queue` or `batch_validator` is [Some]. + prev: Option

, + /// The batch queue stage of the provider. + /// + /// Must be [None] if `prev` or `batch_validator` is [Some]. + batch_queue: Option>, + /// The batch validator stage of the provider. + /// + /// Must be [None] if `prev` or `batch_queue` is [Some]. + batch_validator: Option>, +} + +impl BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + F: L2ChainProvider + Clone + Debug, +{ + /// Creates a new [BatchProvider] with the given configuration and previous stage. + pub const fn new(cfg: Arc, prev: P, provider: F) -> Self { + Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None } + } + + /// Attempts to update the active stage of the mux. + pub(crate) fn attempt_update(&mut self) -> PipelineResult<()> { + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + if let Some(prev) = self.prev.take() { + // On the first call to `attempt_update`, we need to determine the active stage to + // initialize the mux with. + if self.cfg.is_holocene_active(origin.timestamp) { + self.batch_validator = Some(BatchValidator::new(self.cfg.clone(), prev)); + } else { + self.batch_queue = + Some(BatchQueue::new(self.cfg.clone(), prev, self.provider.clone())); + } + } else if self.batch_queue.is_some() && self.cfg.is_holocene_active(origin.timestamp) { + // If the batch queue is active and Holocene is also active, transition to the batch + // validator. + let batch_queue = self.batch_queue.take().expect("Must have batch queue"); + let mut bv = BatchValidator::new(self.cfg.clone(), batch_queue.prev); + bv.l1_blocks = batch_queue.l1_blocks; + self.batch_validator = Some(bv); + } else if self.batch_validator.is_some() && !self.cfg.is_holocene_active(origin.timestamp) { + // If the batch validator is active, and Holocene is not active, it indicates an L1 + // reorg around Holocene activation. Transition back to the batch queue + // until Holocene re-activates. + let batch_validator = self.batch_validator.take().expect("Must have batch validator"); + let mut bq = + BatchQueue::new(self.cfg.clone(), batch_validator.prev, self.provider.clone()); + bq.l1_blocks = batch_validator.l1_blocks; + self.batch_queue = Some(bq); + } + Ok(()) + } +} + +#[async_trait] +impl OriginAdvancer for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + F: L2ChainProvider + Clone + Send + Debug, +{ + async fn advance_origin(&mut self) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(batch_validator) = self.batch_validator.as_mut() { + batch_validator.advance_origin().await + } else if let Some(batch_queue) = self.batch_queue.as_mut() { + batch_queue.advance_origin().await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} + +impl OriginProvider for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + F: L2ChainProvider + Clone + Debug, +{ + fn origin(&self) -> Option { + self.batch_validator.as_ref().map_or_else( + || { + self.batch_queue.as_ref().map_or_else( + || self.prev.as_ref().and_then(|prev| prev.origin()), + |batch_queue| batch_queue.origin(), + ) + }, + |batch_validator| batch_validator.origin(), + ) + } +} + +#[async_trait] +impl SignalReceiver for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + F: L2ChainProvider + Clone + Send + Debug, +{ + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(batch_validator) = self.batch_validator.as_mut() { + batch_validator.signal(signal).await + } else if let Some(batch_queue) = self.batch_queue.as_mut() { + batch_queue.signal(signal).await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} + +#[async_trait] +impl AttributesProvider for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send, + F: L2ChainProvider + Clone + Send + Debug, +{ + fn is_last_in_span(&self) -> bool { + self.batch_validator.as_ref().map_or_else( + || self.batch_queue.as_ref().map_or(false, |batch_queue| batch_queue.is_last_in_span()), + |batch_validator| batch_validator.is_last_in_span(), + ) + } + + async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { + self.attempt_update()?; + + if let Some(batch_validator) = self.batch_validator.as_mut() { + batch_validator.next_batch(parent).await + } else if let Some(batch_queue) = self.batch_queue.as_mut() { + batch_queue.next_batch(parent).await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} + +#[cfg(test)] +mod test { + use super::BatchProvider; + use crate::{ + test_utils::{TestL2ChainProvider, TestNextBatchProvider}, + traits::{OriginProvider, ResetSignal, SignalReceiver}, + }; + use op_alloy_genesis::RollupConfig; + use op_alloy_protocol::BlockInfo; + use std::sync::Arc; + + #[test] + fn test_batch_provider_validator_active() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + assert!(batch_provider.attempt_update().is_ok()); + assert!(batch_provider.prev.is_none()); + assert!(batch_provider.batch_queue.is_none()); + assert!(batch_provider.batch_validator.is_some()); + } + + #[test] + fn test_batch_provider_batch_queue_active() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + assert!(batch_provider.attempt_update().is_ok()); + assert!(batch_provider.prev.is_none()); + assert!(batch_provider.batch_queue.is_some()); + assert!(batch_provider.batch_validator.is_none()); + } + + #[test] + fn test_batch_provider_transition_stage() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + batch_provider.attempt_update().unwrap(); + + // Update the L1 origin to Holocene activation. + let Some(ref mut stage) = batch_provider.batch_queue else { + panic!("Expected BatchQueue"); + }; + stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); + + // Transition to the BatchValidator stage. + batch_provider.attempt_update().unwrap(); + assert!(batch_provider.batch_queue.is_none()); + assert!(batch_provider.batch_validator.is_some()); + + assert_eq!(batch_provider.origin().unwrap().number, 1); + } + + #[test] + fn test_batch_provider_transition_stage_backwards() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + batch_provider.attempt_update().unwrap(); + + // Update the L1 origin to Holocene activation. + let Some(ref mut stage) = batch_provider.batch_queue else { + panic!("Expected BatchQueue"); + }; + stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); + + // Transition to the BatchValidator stage. + batch_provider.attempt_update().unwrap(); + assert!(batch_provider.batch_queue.is_none()); + assert!(batch_provider.batch_validator.is_some()); + + // Update the L1 origin to before Holocene activation, to simulate a re-org. + let Some(ref mut stage) = batch_provider.batch_validator else { + panic!("Expected BatchValidator"); + }; + stage.prev.origin = Some(BlockInfo::default()); + + batch_provider.attempt_update().unwrap(); + assert!(batch_provider.batch_queue.is_some()); + assert!(batch_provider.batch_validator.is_none()); + } + + #[tokio::test] + async fn test_batch_provider_reset_bq() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + // Reset the batch provider. + batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); + + let Some(bq) = batch_provider.batch_queue else { + panic!("Expected BatchQueue"); + }; + assert!(bq.l1_blocks.len() == 1); + } + + #[tokio::test] + async fn test_batch_provider_reset_validator() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + // Reset the batch provider. + batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); + + let Some(bv) = batch_provider.batch_validator else { + panic!("Expected BatchValidator"); + }; + assert!(bv.l1_blocks.len() == 1); + } +} diff --git a/crates/derive/src/stages/batch/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs index cf32463e0..0a320f367 100644 --- a/crates/derive/src/stages/batch/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -37,26 +37,26 @@ where BF: L2ChainProvider + Debug, { /// The rollup config. - cfg: Arc, + pub(crate) cfg: Arc, /// The previous stage of the derivation pipeline. - prev: P, + pub(crate) prev: P, /// The l1 block ref - origin: Option, + pub(crate) origin: Option, /// A consecutive, time-centric window of L1 Blocks. /// Every L1 origin of unsafe L2 Blocks must be included in this list. /// If every L2 Block corresponding to a single L1 Block becomes safe, /// the block is popped from this list. /// If new L2 Block's L1 origin is not included in this list, fetch and /// push it to the list. - l1_blocks: Vec, + pub(crate) l1_blocks: Vec, /// A set of batches in order from when we've seen them. - batches: Vec, + pub(crate) batches: Vec, /// A set of cached [SingleBatch]es derived from [SpanBatch]es. /// /// [SpanBatch]: crate::batch::SpanBatch - next_spans: Vec, + pub(crate) next_spans: Vec, /// Used to validate the batches. - fetcher: BF, + pub(crate) fetcher: BF, } impl BatchQueue @@ -475,7 +475,7 @@ mod tests { use super::*; use crate::{ stages::channel::channel_reader::BatchReader, - test_utils::{CollectingLayer, TestBatchQueueProvider, TestL2ChainProvider, TraceStorage}, + test_utils::{CollectingLayer, TestL2ChainProvider, TestNextBatchProvider, TraceStorage}, }; use alloc::vec; use alloy_consensus::Header; @@ -500,7 +500,7 @@ mod tests { #[test] fn test_pop_next_batch() { let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(vec![]); + let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); @@ -514,7 +514,7 @@ mod tests { #[tokio::test] async fn test_batch_queue_reset() { let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(vec![]); + let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); bq.l1_blocks.push(BlockInfo::default()); @@ -535,7 +535,7 @@ mod tests { #[tokio::test] async fn test_batch_queue_flush() { let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(vec![]); + let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); bq.l1_blocks.push(BlockInfo::default()); @@ -574,7 +574,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -605,7 +605,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -639,7 +639,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -674,7 +674,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -693,7 +693,7 @@ mod tests { async fn test_derive_next_batch_missing_origin() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(data); + let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); @@ -709,7 +709,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -730,7 +730,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -754,7 +754,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -778,7 +778,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -811,7 +811,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -852,7 +852,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -891,7 +891,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -910,7 +910,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -926,7 +926,7 @@ mod tests { let mut reader = new_batch_reader(); let cfg = Arc::new(RollupConfig::default()); let batch = reader.next_batch(cfg.as_ref()).unwrap(); - let mock = TestBatchQueueProvider::new(vec![Ok(batch)]); + let mock = TestNextBatchProvider::new(vec![Ok(batch)]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); @@ -942,7 +942,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -1012,7 +1012,7 @@ mod tests { tx.encode(&mut buf); let prefixed = [&[OpTxType::Deposit as u8], &buf[..]].concat(); second_batch_txs.insert(0, Bytes::copy_from_slice(&prefixed)); - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); let origin_check = b256!("8527cdb6f601acf9b483817abd1da92790c92b19000000000000000000000000"); mock.origin = Some(BlockInfo { @@ -1099,7 +1099,7 @@ mod tests { async fn test_batch_queue_empty_bytes() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(data); + let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); diff --git a/crates/derive/src/stages/batch/batch_validator.rs b/crates/derive/src/stages/batch/batch_validator.rs index 278a80170..5b05fc376 100644 --- a/crates/derive/src/stages/batch/batch_validator.rs +++ b/crates/derive/src/stages/batch/batch_validator.rs @@ -26,18 +26,18 @@ where P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The rollup configuration. - cfg: Arc, + pub(crate) cfg: Arc, /// The previous stage of the derivation pipeline. - prev: P, + pub(crate) prev: P, /// The L1 origin of the batch sequencer. - origin: Option, + pub(crate) origin: Option, /// A consecutive, time-centric window of L1 Blocks. /// Every L1 origin of unsafe L2 Blocks must be included in this list. /// If every L2 Block corresponding to a single L1 Block becomes safe, /// the block is popped from this list. /// If new L2 Block's L1 origin is not included in this list, fetch and /// push it to the list. - l1_blocks: Vec, + pub(crate) l1_blocks: Vec, } impl

BatchValidator

@@ -314,12 +314,10 @@ mod test { use super::BatchValidator; use crate::{ batch::{Batch, SingleBatch, SpanBatch}, - errors::{PipelineErrorKind, ResetError}, - pipeline::{PipelineResult, SignalReceiver}, - prelude::PipelineError, + errors::{PipelineError, PipelineErrorKind, PipelineResult, ResetError}, stages::NextBatchProvider, - test_utils::{CollectingLayer, TestBatchQueueProvider, TraceStorage}, - traits::{AttributesProvider, OriginAdvancer, ResetSignal, Signal}, + test_utils::{CollectingLayer, TestNextBatchProvider, TraceStorage}, + traits::{AttributesProvider, OriginAdvancer, ResetSignal, Signal, SignalReceiver}, }; use alloc::sync::Arc; use alloy_eips::{BlockNumHash, NumHash}; @@ -332,7 +330,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_eof() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo::default()); let mut bv = BatchValidator::new(cfg, mock); bv.origin = Some(BlockInfo { number: 1, ..Default::default() }); @@ -347,7 +345,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_startup() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo::default()); let mut bv = BatchValidator::new(cfg, mock); @@ -376,7 +374,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_advance() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 2, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); @@ -405,7 +403,7 @@ mod test { #[tokio::test] async fn test_batch_validator_advance_epoch() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 2, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); @@ -436,7 +434,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_drain_prev() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new( + let mut mock = TestNextBatchProvider::new( (0..5).map(|_| Ok(Batch::Single(SingleBatch::default()))).collect(), ); mock.origin = Some(BlockInfo::default()); @@ -461,7 +459,7 @@ mod test { #[tokio::test] async fn test_batch_validator_l1_origin_mismatch() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![Ok(Batch::Single(SingleBatch::default()))]); + let mut mock = TestNextBatchProvider::new(vec![Ok(Batch::Single(SingleBatch::default()))]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); bv.origin = Some(BlockInfo::default()); @@ -481,7 +479,7 @@ mod test { #[tokio::test] async fn test_batch_validator_received_span_batch() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![Ok(Batch::Span(SpanBatch::default()))]); + let mut mock = TestNextBatchProvider::new(vec![Ok(Batch::Span(SpanBatch::default()))]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); bv.origin = Some(BlockInfo::default()); @@ -523,7 +521,7 @@ mod test { // Setup batch validator deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); // Configure batch validator @@ -550,7 +548,7 @@ mod test { tracing_subscriber::Registry::default().with(layer).init(); let cfg = Arc::new(RollupConfig { seq_window_size: 5, ..Default::default() }); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); @@ -588,7 +586,7 @@ mod test { tracing_subscriber::Registry::default().with(layer).init(); let cfg = Arc::new(RollupConfig { seq_window_size: 5, ..Default::default() }); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); diff --git a/crates/derive/src/stages/batch/mod.rs b/crates/derive/src/stages/batch/mod.rs index f6e7a01ba..6bf52b4d8 100644 --- a/crates/derive/src/stages/batch/mod.rs +++ b/crates/derive/src/stages/batch/mod.rs @@ -1,9 +1,9 @@ //! Contains stages pertaining to the processing of [Batch]es. //! -//! Sitting after the [ChannelReader] stage, the [BatchStream] and [BatchQueue] stages are +//! Sitting after the [ChannelReader] stage, the [BatchStream] and [BatchProvider] stages are //! responsible for validating and ordering the [Batch]es. The [BatchStream] stage is responsible -//! for streaming [SingleBatch]es from [SpanBatch]es, while the [BatchQueue] stage is responsible -//! for ordering the [Batch]es for the [AttributesQueue] stage. +//! for streaming [SingleBatch]es from [SpanBatch]es, while the [BatchProvider] stage is responsible +//! for ordering and validating the [Batch]es for the [AttributesQueue] stage. //! //! [Batch]: crate::batch::Batch //! [SingleBatch]: crate::batch::SingleBatch @@ -25,6 +25,9 @@ pub use batch_queue::BatchQueue; mod batch_validator; pub use batch_validator::BatchValidator; +mod batch_provider; +pub use batch_provider::BatchProvider; + /// Provides [Batch]es for the [BatchQueue] and [BatchValidator] stages. #[async_trait] pub trait NextBatchProvider { diff --git a/crates/derive/src/stages/channel/channel_assembler.rs b/crates/derive/src/stages/channel/channel_assembler.rs index 2e48de481..3ee827f72 100644 --- a/crates/derive/src/stages/channel/channel_assembler.rs +++ b/crates/derive/src/stages/channel/channel_assembler.rs @@ -44,11 +44,6 @@ where Self { cfg, prev, channel: None } } - /// Consumes [Self] and returns the previous stage. - pub fn into_prev(self) -> P { - self.prev - } - /// Returns whether or not the channel currently being assembled has timed out. pub fn is_timed_out(&self) -> PipelineResult { let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; diff --git a/crates/derive/src/stages/channel/channel_bank.rs b/crates/derive/src/stages/channel/channel_bank.rs index 7f2b05c6b..ded3a6b9d 100644 --- a/crates/derive/src/stages/channel/channel_bank.rs +++ b/crates/derive/src/stages/channel/channel_bank.rs @@ -56,11 +56,6 @@ where Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } - /// Consumes [Self] and returns the previous stage. - pub fn into_prev(self) -> P { - self.prev - } - /// Returns the size of the channel bank by accumulating over all channels. pub fn size(&self) -> usize { self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()) @@ -284,18 +279,6 @@ mod tests { use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - #[test] - fn test_channel_bank_into_prev() { - let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)]; - let mock = TestNextFrameProvider::new(frames.into_iter().map(Ok).collect()); - let cfg = Arc::new(RollupConfig::default()); - let channel_bank = ChannelBank::new(cfg, mock); - - let prev = channel_bank.into_prev(); - assert_eq!(prev.origin(), Some(BlockInfo::default())); - assert_eq!(prev.data.len(), 1) - } - #[test] fn test_try_read_channel_at_index_missing_channel() { let mock = TestNextFrameProvider::new(vec![]); diff --git a/crates/derive/src/stages/channel/channel_provider.rs b/crates/derive/src/stages/channel/channel_provider.rs index 6d6ad94b9..75bbcf399 100644 --- a/crates/derive/src/stages/channel/channel_provider.rs +++ b/crates/derive/src/stages/channel/channel_provider.rs @@ -1,17 +1,137 @@ //! This module contains the [ChannelProvider] stage. use super::{ChannelAssembler, ChannelBank, ChannelReaderProvider, NextFrameProvider}; -use crate::stages::multiplexed::multiplexed_stage; +use crate::{ + errors::{PipelineError, PipelineResult}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, +}; +use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Bytes; +use async_trait::async_trait; use core::fmt::Debug; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::BlockInfo; + +/// The [ChannelProvider] stage is a mux between the [ChannelBank] and [ChannelAssembler] stages. +/// +/// Rules: +/// When Holocene is not active, the [ChannelBank] is used. +/// When Holocene is active, the [ChannelAssembler] is used. +#[derive(Debug)] +pub struct ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + /// The rollup configuration. + cfg: Arc, + /// The previous stage of the derivation pipeline. + /// + /// If this is set to [None], the multiplexer has been activated and the active stage + /// owns the previous stage. + /// + /// Must be [None] if `channel_bank` or `channel_assembler` is [Some]. + prev: Option

, + /// The channel bank stage of the provider. + /// + /// Must be [None] if `prev` or `channel_assembler` is [Some]. + channel_bank: Option>, + /// The channel assembler stage of the provider. + /// + /// Must be [None] if `prev` or `channel_bank` is [Some]. + channel_assembler: Option>, +} -multiplexed_stage!( - ChannelProvider, - stages: { - ChannelAssembler => is_holocene_active, +impl

ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + /// Creates a new [ChannelProvider] with the given configuration and previous stage. + pub const fn new(cfg: Arc, prev: P) -> Self { + Self { cfg, prev: Some(prev), channel_bank: None, channel_assembler: None } } - default_stage: ChannelBank -); + + /// Attempts to update the active stage of the mux. + pub(crate) fn attempt_update(&mut self) -> PipelineResult<()> { + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + if let Some(prev) = self.prev.take() { + // On the first call to `attempt_update`, we need to determine the active stage to + // initialize the mux with. + if self.cfg.is_holocene_active(origin.timestamp) { + self.channel_assembler = Some(ChannelAssembler::new(self.cfg.clone(), prev)); + } else { + self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), prev)); + } + } else if self.channel_bank.is_some() && self.cfg.is_holocene_active(origin.timestamp) { + // If the channel bank is active and Holocene is also active, transition to the channel + // assembler. + let channel_bank = self.channel_bank.take().expect("Must have channel bank"); + self.channel_assembler = + Some(ChannelAssembler::new(self.cfg.clone(), channel_bank.prev)); + } else if self.channel_assembler.is_some() && !self.cfg.is_holocene_active(origin.timestamp) + { + // If the channel assembler is active, and Holocene is not active, it indicates an L1 + // reorg around Holocene activation. Transition back to the channel bank + // until Holocene re-activates. + let channel_assembler = + self.channel_assembler.take().expect("Must have channel assembler"); + self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), channel_assembler.prev)); + } + Ok(()) + } +} + +#[async_trait] +impl

OriginAdvancer for ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn advance_origin(&mut self) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(channel_assembler) = self.channel_assembler.as_mut() { + channel_assembler.advance_origin().await + } else if let Some(channel_bank) = self.channel_bank.as_mut() { + channel_bank.advance_origin().await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} + +impl

OriginProvider for ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + fn origin(&self) -> Option { + self.channel_assembler.as_ref().map_or_else( + || { + self.channel_bank.as_ref().map_or_else( + || self.prev.as_ref().and_then(|prev| prev.origin()), + |channel_bank| channel_bank.origin(), + ) + }, + |channel_assembler| channel_assembler.origin(), + ) + } +} + +#[async_trait] +impl

SignalReceiver for ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(channel_assembler) = self.channel_assembler.as_mut() { + channel_assembler.signal(signal).await + } else if let Some(channel_bank) = self.channel_bank.as_mut() { + channel_bank.signal(signal).await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} #[async_trait] impl

ChannelReaderProvider for ChannelProvider

@@ -19,16 +139,21 @@ where P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn next_data(&mut self) -> PipelineResult> { - match self.active_stage_mut()? { - ActiveStage::ChannelAssembler(stage) => stage.next_data().await, - ActiveStage::ChannelBank(stage) => stage.next_data().await, + self.attempt_update()?; + + if let Some(channel_assembler) = self.channel_assembler.as_mut() { + channel_assembler.next_data().await + } else if let Some(channel_bank) = self.channel_bank.as_mut() { + channel_bank.next_data().await + } else { + Err(PipelineError::NotEnoughData.temp()) } } } #[cfg(test)] mod test { - use super::{ActiveStage, ChannelProvider}; + use super::ChannelProvider; use crate::{ prelude::{OriginProvider, PipelineError}, stages::ChannelReaderProvider, @@ -45,8 +170,10 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelAssembler(_))); + assert!(channel_provider.attempt_update().is_ok()); + assert!(channel_provider.prev.is_none()); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); } #[test] @@ -55,8 +182,10 @@ mod test { let cfg = Arc::new(RollupConfig::default()); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelBank(_))); + assert!(channel_provider.attempt_update().is_ok()); + assert!(channel_provider.prev.is_none()); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); } #[test] @@ -66,19 +195,20 @@ mod test { let mut channel_provider = ChannelProvider::new(cfg, provider); // Assert the multiplexer hasn't been initialized. - assert!(channel_provider.active_stage.is_none()); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_none()); assert!(channel_provider.prev.is_some()); // Load in the active stage. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelBank(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); + assert!(channel_provider.prev.is_none()); // Ensure the active stage is retained on the second call. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelBank(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); + assert!(channel_provider.prev.is_none()); } #[test] @@ -88,19 +218,20 @@ mod test { let mut channel_provider = ChannelProvider::new(cfg, provider); // Assert the multiplexer hasn't been initialized. - assert!(channel_provider.active_stage.is_none()); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_none()); assert!(channel_provider.prev.is_some()); // Load in the active stage. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelAssembler(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); + assert!(channel_provider.prev.is_none()); // Ensure the active stage is retained on the second call. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelAssembler(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); + assert!(channel_provider.prev.is_none()); } #[test] @@ -109,17 +240,18 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); + channel_provider.attempt_update().unwrap(); // Update the L1 origin to Holocene activation. - let ActiveStage::ChannelBank(stage) = active_stage else { + let Some(ref mut stage) = channel_provider.channel_bank else { panic!("Expected ChannelBank"); }; stage.prev.block_info = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); // Transition to the ChannelAssembler stage. - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelAssembler(_))); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); assert_eq!(channel_provider.origin().unwrap().number, 1); } @@ -130,25 +262,28 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); + channel_provider.attempt_update().unwrap(); // Update the L1 origin to Holocene activation. - let ActiveStage::ChannelBank(stage) = active_stage else { + let Some(ref mut stage) = channel_provider.channel_bank else { panic!("Expected ChannelBank"); }; stage.prev.block_info = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); // Transition to the ChannelAssembler stage. - let active_stage = channel_provider.active_stage_mut().unwrap(); - let ActiveStage::ChannelAssembler(stage) = active_stage else { - panic!("Expected ChannelBank"); - }; + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); // Update the L1 origin to before Holocene activation, to simulate a re-org. + let Some(ref mut stage) = channel_provider.channel_assembler else { + panic!("Expected ChannelAssembler"); + }; stage.prev.block_info = Some(BlockInfo::default()); - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelBank(_))); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); } #[tokio::test] @@ -166,7 +301,7 @@ mod test { channel_provider.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp() ); - let Ok(ActiveStage::ChannelBank(channel_bank)) = channel_provider.active_stage_mut() else { + let Some(channel_bank) = channel_provider.channel_bank.as_mut() else { panic!("Expected ChannelBank"); }; // Ensure a channel is in the queue. @@ -176,7 +311,7 @@ mod test { channel_provider.signal(ResetSignal::default().signal()).await.unwrap(); // Ensure the channel queue is empty after reset. - let Ok(ActiveStage::ChannelBank(channel_bank)) = channel_provider.active_stage_mut() else { + let Some(channel_bank) = channel_provider.channel_bank.as_mut() else { panic!("Expected ChannelBank"); }; assert!(channel_bank.channel_queue.is_empty()); @@ -197,10 +332,8 @@ mod test { channel_provider.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp() ); - let Ok(ActiveStage::ChannelAssembler(channel_assembler)) = - channel_provider.active_stage_mut() - else { - panic!("Expected ChannelBank"); + let Some(channel_assembler) = channel_provider.channel_assembler.as_mut() else { + panic!("Expected ChannelAssembler"); }; // Ensure a channel is being built. assert!(channel_assembler.channel.is_some()); @@ -209,10 +342,8 @@ mod test { channel_provider.signal(ResetSignal::default().signal()).await.unwrap(); // Ensure the channel assembler is empty after reset. - let Ok(ActiveStage::ChannelAssembler(channel_assembler)) = - channel_provider.active_stage_mut() - else { - panic!("Expected ChannelBank"); + let Some(channel_assembler) = channel_provider.channel_assembler.as_mut() else { + panic!("Expected ChannelAssembler"); }; assert!(channel_assembler.channel.is_none()); } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 9ec070202..cec0b6341 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -31,14 +31,12 @@ pub use channel::{ }; mod batch; -pub use batch::{BatchQueue, BatchStream, BatchStreamProvider, BatchValidator, NextBatchProvider}; +pub use batch::{ + BatchProvider, BatchQueue, BatchStream, BatchStreamProvider, BatchValidator, NextBatchProvider, +}; mod attributes_queue; pub use attributes_queue::AttributesQueue; -#[macro_use] -mod multiplexed; -pub use multiplexed::MultiplexerError; - mod utils; pub use utils::decompress_brotli; diff --git a/crates/derive/src/stages/multiplexed.rs b/crates/derive/src/stages/multiplexed.rs deleted file mode 100644 index 64527a7bf..000000000 --- a/crates/derive/src/stages/multiplexed.rs +++ /dev/null @@ -1,254 +0,0 @@ -//! Contains the [multiplexed_stage] macro. - -use thiserror::Error; - -/// An error type for the multiplexer stages. -#[derive(Error, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum MultiplexerError { - /// Thrown when the multiplexer has not yet been activated, but a sub-stage is being accessed. - #[error("The multiplexer has not been activated.")] - NotActivated, -} - -/// The [multiplexed_stage] macro generates a new stage that swaps its underlying functionality -/// depending on the active hardfork. -/// -/// By default, the stage struct generated by this macro: -/// - Implements [OriginAdvancer], [OriginProvider], and [SignalReceiver]. -/// - Contains an enum that represents the active stage, in the `stages` key. -/// - Activates stages based on the conditions provided in the `stages` key. -/// -/// When a new fork with a stage definition activates, relative to the pipeline origin, the active -/// stage is dissolved and the ownership of the previous stage is transferred to the new stage. -/// -/// Stage requirements: -/// - The previous stage must implement [OriginAdvancer], [OriginProvider], [SignalReceiver], and -/// [Debug]. -/// - The stages must implement an `into_prev` method that returns the owned previous stage. -/// -/// ## Example Usage -/// ```rust,ignore -/// multiplexed_stage!( -/// MyStage, -/// stages: { -/// EcotoneStage => is_ecotone_active, -/// } -/// default_stage: BedrockStage -/// ); -/// ``` -/// -/// To add additional fields to the multiplexer stage, that must be passed to the `new` function of -/// the multiplexer and sub-stages: -/// ```rust,ignore -/// multiplexed_stage!( -/// MyStage, -/// additional_fields: { -/// /// The number of blocks to wait before advancing the origin. -/// block_wait: u64, -/// } -/// stages: { -/// EcotoneStage(block_wait) => is_ecotone_active, -/// } -/// default_stage: BedrockStage -/// ); -/// -/// // -- snip -- -/// -/// let cfg = Arc::new(RollupConfig::default()); -/// let prev = MyPrevStage::default(); -/// MyStage::new(cfg.clone(), prev, 10); -/// ``` -/// -/// [OriginAdvancer]: crate::pipeline::OriginAdvancer -/// [OriginProvider]: crate::pipeline::OriginProvider -/// [SignalReceiver]: crate::pipeline::SignalReceiver -/// [Debug]: core::fmt::Debug -macro_rules! multiplexed_stage { - ( - $provider_name:ident<$prev_type:ident$(, $provider_generic:ident: $($provider_generic_bound:ident)*)*>, - $( - additional_fields: { - $(#[doc = $comment:expr])? - $($field_name:ident: $field_type:ty,)+ - } - )? - stages: { - $($stage_name:ident$(($($input_name:ident$(,)?)+))? => $stage_condition:ident,)* - } - default_stage: $last_stage_name:ident$(($($last_input_name:ident$(,)?)+))? - ) => { - use $crate::{ - pipeline::{OriginAdvancer, OriginProvider, SignalReceiver, Signal, PipelineError, PipelineResult}, - stages::MultiplexerError - }; - use async_trait::async_trait; - use alloc::boxed::Box; - - #[doc = concat!("The active stage of the ", stringify!($provider_name), ".")] - #[derive(Debug)] - enum ActiveStage - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, - { - $($stage_name($stage_name

,),)* - $last_stage_name($last_stage_name

), - } - - impl ActiveStage - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, - { - /// Dissolves the active stage and returns the previous stage. - pub(crate) fn into_prev(self) -> P { - match self { - $(ActiveStage::$stage_name(stage) => stage.into_prev(),)* - ActiveStage::$last_stage_name(stage) => stage.into_prev(), - } - } - } - - #[doc = concat!("The ", stringify!($provider_name), " stage is responsible for multiplexing sub-stages.")] - #[derive(Debug)] - pub struct $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, - { - /// The rollup configuration. - cfg: alloc::sync::Arc, - /// The previous stage of the derivation pipeline. - /// - /// If this is set to [None], the multiplexer has been activated and the active stage - /// owns the previous stage. - /// - /// Must be [None] if `active_stage` is [Some]. - prev: Option

, - /// The active stage of the provider. - /// - /// If this is set to [None], the multiplexer has not been activated and the previous - /// stage is owned by the multiplexer. - /// - /// Must be [None] if `prev` is [Some]. - active_stage: Option>, - $( - $(#[doc = $comment])? - $($field_name: $field_type,)+ - )? - } - - impl $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, - { - /// Creates a new instance of the provider. - pub const fn new(cfg: alloc::sync::Arc, prev: P$( $(, $field_name: $field_type)+ )?) -> Self { - Self { - cfg, - prev: Some(prev), - active_stage: None, - $( - $($field_name,)+ - )? - } - } - - #[doc = concat!("Returns a mutable ref to the active stage of the ", stringify!($provider_name), ".")] - const fn active_stage_ref(&self) -> Option<&ActiveStage> { - self.active_stage.as_ref() - } - - #[doc = concat!("Returns a mutable ref to the active stage of the ", stringify!($provider_name), ".")] - fn active_stage_mut(&mut self) -> PipelineResult<&mut ActiveStage> { - // If the multiplexer has not been activated, activate the correct stage. - if let Some(prev) = self.prev.take() { - let origin = prev.origin().ok_or(PipelineError::MissingOrigin.crit())?; - - self.active_stage = Some( - $(if self.cfg.$stage_condition(origin.timestamp) { - ActiveStage::$stage_name($stage_name::new(self.cfg.clone(), prev$($(, self.$input_name.clone())*)?)) - } else)* { - ActiveStage::$last_stage_name($last_stage_name::new(self.cfg.clone(), prev$($(, self.$last_input_name.clone())*)?)) - } - ); - return self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()); - } else { - // Otherwise, check if the active stage should be changed. - let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; - let active_stage = self.active_stage.take().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit())?; - - // If a new stage has activated, transfer ownership of the previous stage to the new stage to - // re-link the pipeline at runtime. - $(if self.cfg.$stage_condition(origin.timestamp) { - // If the correct stage is already active, return it. - if matches!(active_stage, ActiveStage::$stage_name(_)) { - self.active_stage = Some(active_stage); - return self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()); - } - - // Otherwise, dissolve the active stage and create a new one, granting ownership of - // the previous stage to the new stage. - let prev = active_stage.into_prev(); - self.active_stage = Some(ActiveStage::$stage_name($stage_name::new(self.cfg.clone(), prev$($(, self.$input_name.clone())*)?))); - } else)* { - // If the correct stage is already active, return it. - if matches!(active_stage, ActiveStage::$last_stage_name(_)) { - self.active_stage = Some(active_stage); - return self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()); - } - - self.active_stage = Some(ActiveStage::$last_stage_name($last_stage_name::new(self.cfg.clone(), active_stage.into_prev()$($(, self.$last_input_name.clone())*)?))); - } - } - - self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()) - } - } - - #[async_trait] - impl OriginAdvancer for $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, - { - async fn advance_origin(&mut self) -> PipelineResult<()> { - match self.active_stage_mut()? { - $(ActiveStage::$stage_name(stage) => stage.advance_origin().await,)* - ActiveStage::$last_stage_name(stage) => stage.advance_origin().await, - } - } - } - - impl OriginProvider for $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, - { - fn origin(&self) -> Option { - match self.active_stage_ref() { - Some(stage) => { - match stage { - $(ActiveStage::$stage_name(stage) => stage.origin(),)* - ActiveStage::$last_stage_name(stage) => stage.origin(), - } - } - None => self.prev.as_ref().map(|prev| prev.origin()).flatten(), - } - } - } - - #[async_trait] - impl SignalReceiver for $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, - { - async fn signal( - &mut self, - signal: Signal, - ) -> PipelineResult<()> { - match self.active_stage_mut()? { - $(ActiveStage::$stage_name(stage) => stage.signal(signal).await,)* - ActiveStage::$last_stage_name(stage) => stage.signal(signal).await, - } - } - } - } -} - -pub(crate) use multiplexed_stage; diff --git a/crates/derive/src/test_utils/batch_queue.rs b/crates/derive/src/test_utils/batch_provider.rs similarity index 87% rename from crates/derive/src/test_utils/batch_queue.rs rename to crates/derive/src/test_utils/batch_provider.rs index 396222a51..8cbdf3083 100644 --- a/crates/derive/src/test_utils/batch_queue.rs +++ b/crates/derive/src/test_utils/batch_provider.rs @@ -12,7 +12,7 @@ use op_alloy_protocol::{BlockInfo, L2BlockInfo}; /// A mock provider for the [BatchQueue] stage. #[derive(Debug, Default)] -pub struct TestBatchQueueProvider { +pub struct TestNextBatchProvider { /// The origin of the L1 block. pub origin: Option, /// A list of batches to return. @@ -23,21 +23,21 @@ pub struct TestBatchQueueProvider { pub reset: bool, } -impl TestBatchQueueProvider { +impl TestNextBatchProvider { /// Creates a new [MockBatchQueueProvider] with the given origin and batches. pub fn new(batches: Vec>) -> Self { Self { origin: Some(BlockInfo::default()), batches, flushed: false, reset: false } } } -impl OriginProvider for TestBatchQueueProvider { +impl OriginProvider for TestNextBatchProvider { fn origin(&self) -> Option { self.origin } } #[async_trait] -impl NextBatchProvider for TestBatchQueueProvider { +impl NextBatchProvider for TestNextBatchProvider { fn flush(&mut self) { self.flushed = true; } @@ -52,7 +52,7 @@ impl NextBatchProvider for TestBatchQueueProvider { } #[async_trait] -impl OriginAdvancer for TestBatchQueueProvider { +impl OriginAdvancer for TestNextBatchProvider { async fn advance_origin(&mut self) -> PipelineResult<()> { self.origin = self.origin.map(|mut origin| { origin.number += 1; @@ -63,7 +63,7 @@ impl OriginAdvancer for TestBatchQueueProvider { } #[async_trait] -impl SignalReceiver for TestBatchQueueProvider { +impl SignalReceiver for TestNextBatchProvider { async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { match signal { Signal::Reset { .. } => self.reset = true, diff --git a/crates/derive/src/test_utils/channel_bank.rs b/crates/derive/src/test_utils/channel_provider.rs similarity index 100% rename from crates/derive/src/test_utils/channel_bank.rs rename to crates/derive/src/test_utils/channel_provider.rs diff --git a/crates/derive/src/test_utils/mod.rs b/crates/derive/src/test_utils/mod.rs index 0a4e7d2d4..55bb9795f 100644 --- a/crates/derive/src/test_utils/mod.rs +++ b/crates/derive/src/test_utils/mod.rs @@ -2,9 +2,9 @@ mod pipeline; pub use pipeline::{ - new_test_pipeline, TestAttributesQueue, TestBatchQueue, TestBatchStream, TestChannelProvider, - TestChannelReader, TestFrameQueue, TestL1Retrieval, TestL1Traversal, TestNextAttributes, - TestPipeline, + new_test_pipeline, TestAttributesQueue, TestBatchProvider, TestBatchStream, + TestChannelProvider, TestChannelReader, TestFrameQueue, TestL1Retrieval, TestL1Traversal, + TestNextAttributes, TestPipeline, }; mod blob_provider; @@ -16,8 +16,8 @@ pub use chain_providers::{TestChainProvider, TestL2ChainProvider, TestProviderEr mod data_availability_provider; pub use data_availability_provider::{TestDAP, TestIter}; -mod batch_queue; -pub use batch_queue::TestBatchQueueProvider; +mod batch_provider; +pub use batch_provider::TestNextBatchProvider; mod attributes_queue; pub use attributes_queue::{ @@ -27,8 +27,8 @@ pub use attributes_queue::{ mod batch_stream; pub use batch_stream::TestBatchStreamProvider; -mod channel_bank; -pub use channel_bank::TestNextFrameProvider; +mod channel_provider; +pub use channel_provider::TestNextFrameProvider; mod channel_reader; pub use channel_reader::TestChannelReaderProvider; diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index da1edaafc..8e721a30c 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -1,7 +1,10 @@ //! Test Utilities for the [crate::pipeline::DerivationPipeline] //! as well as its stages and providers. -use crate::test_utils::{TestChainProvider, TestL2ChainProvider}; +use crate::{ + stages::BatchProvider, + test_utils::{TestChainProvider, TestL2ChainProvider}, +}; use alloc::{boxed::Box, sync::Arc}; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; @@ -12,8 +15,8 @@ use crate::{ errors::PipelineError, pipeline::{DerivationPipeline, PipelineBuilder, PipelineResult}, stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, - L1Retrieval, L1Traversal, + AttributesQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, }, test_utils::{TestAttributesBuilder, TestDAP}, traits::{NextAttributes, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, @@ -77,10 +80,10 @@ pub type TestChannelReader = ChannelReader; pub type TestBatchStream = BatchStream; /// A [BatchQueue] using test providers and sources. -pub type TestBatchQueue = BatchQueue; +pub type TestBatchProvider = BatchProvider; /// An [AttributesQueue] using test providers and sources. -pub type TestAttributesQueue = AttributesQueue; +pub type TestAttributesQueue = AttributesQueue; /// A [DerivationPipeline] using test providers and sources. pub type TestPipeline = DerivationPipeline;