Skip to content

Commit

Permalink
feat(derive): BatchProvider multiplexed stage (#726)
Browse files Browse the repository at this point in the history
* feat(derive): `BatchProvider` multiplexed stage

* tests

* monolithic mux macro -> viking funeral ⛵🔥

* rebase

* updates

* remove `into_prev`
  • Loading branch information
clabby authored Oct 22, 2024
1 parent 3f73cce commit 4e07ad2
Show file tree
Hide file tree
Showing 17 changed files with 572 additions and 421 deletions.
4 changes: 2 additions & 2 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use kona_derive::{
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{
Expand Down Expand Up @@ -49,7 +49,7 @@ pub type OracleAttributesBuilder<O> =

/// An oracle-backed attributes queue for the derivation pipeline.
pub type OracleAttributesQueue<DAP, O> = AttributesQueue<
BatchQueue<
BatchProvider<
BatchStream<
ChannelReader<
ChannelProvider<
Expand Down
4 changes: 2 additions & 2 deletions crates/derive-alloy/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use kona_derive::{
pipeline::{DerivationPipeline, PipelineBuilder},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
};
Expand Down Expand Up @@ -34,7 +34,7 @@ pub type OnlineAttributesBuilder =

/// An `online` attributes queue for the derivation pipeline.
pub type OnlineAttributesQueue<DAP> = AttributesQueue<
BatchQueue<
BatchProvider<
BatchStream<
ChannelReader<
ChannelProvider<FrameQueue<L1Retrieval<DAP, L1Traversal<AlloyChainProvider>>>>,
Expand Down
8 changes: 1 addition & 7 deletions crates/derive/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! This module contains derivation errors thrown within the pipeline.

use crate::{
batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS},
stages::MultiplexerError,
};
use crate::batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS};
use alloc::string::String;
use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
Expand Down Expand Up @@ -116,9 +113,6 @@ pub enum PipelineError {
/// [PipelineEncodingError] variant.
#[error("Decode error: {0}")]
BadEncoding(#[from] PipelineEncodingError),
/// A multiplexer stage error.
#[error("Multiplexer error: {0}")]
Multiplexer(#[from] MultiplexerError),
/// Provider error variant.
#[error("Blob provider error: {0}")]
Provider(String),
Expand Down
12 changes: 6 additions & 6 deletions crates/derive/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline};
use crate::{
stages::{
AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal,
},
traits::{ChainProvider, L2ChainProvider},
Expand All @@ -19,8 +19,8 @@ type FrameQueueStage<DAP, P> = FrameQueue<L1RetrievalStage<DAP, P>>;
type ChannelProviderStage<DAP, P> = ChannelProvider<FrameQueueStage<DAP, P>>;
type ChannelReaderStage<DAP, P> = ChannelReader<ChannelProviderStage<DAP, P>>;
type BatchStreamStage<DAP, P, T> = BatchStream<ChannelReaderStage<DAP, P>, T>;
type BatchQueueStage<DAP, P, T> = BatchQueue<BatchStreamStage<DAP, P, T>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchQueueStage<DAP, P, T>, B>;
type BatchProviderStage<DAP, P, T> = BatchProvider<BatchStreamStage<DAP, P, T>, T>;
type AttributesQueueStage<DAP, P, T, B> = AttributesQueue<BatchProviderStage<DAP, P, T>, B>;

/// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern.
#[derive(Debug)]
Expand Down Expand Up @@ -137,10 +137,10 @@ where
let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config));
let batch_stream =
BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone());
let batch_queue =
BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());
let batch_provider =
BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone());
let attributes =
AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder);
AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder);

// Create the pipeline.
Self::new(attributes, rollup_config, l2_chain_provider)
Expand Down
300 changes: 300 additions & 0 deletions crates/derive/src/stages/batch/batch_provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
//! This module contains the [BatchProvider] stage.

use super::NextBatchProvider;
use crate::{
batch::SingleBatch,
errors::{PipelineError, PipelineResult},
stages::{BatchQueue, BatchValidator},
traits::{
AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver,
},
};
use alloc::{boxed::Box, sync::Arc};
use async_trait::async_trait;
use core::fmt::Debug;
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, L2BlockInfo};

/// The [BatchProvider] stage is a mux between the [BatchQueue] and [BatchValidator] stages.
///
/// Rules:
/// When Holocene is not active, the [BatchQueue] is used.
/// When Holocene is active, the [BatchValidator] is used.
///
/// When transitioning between the two stages, the mux will reset the active stage, but
/// retain `l1_blocks`.
#[derive(Debug)]
pub struct BatchProvider<P, F>
where
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
F: L2ChainProvider + Clone + Debug,
{
/// The rollup configuration.
cfg: Arc<RollupConfig>,
/// The L2 chain provider.
provider: F,
/// The previous stage of the derivation pipeline.
///
/// If this is set to [None], the multiplexer has been activated and the active stage
/// owns the previous stage.
///
/// Must be [None] if `batch_queue` or `batch_validator` is [Some].
prev: Option<P>,
/// The batch queue stage of the provider.
///
/// Must be [None] if `prev` or `batch_validator` is [Some].
batch_queue: Option<BatchQueue<P, F>>,
/// The batch validator stage of the provider.
///
/// Must be [None] if `prev` or `batch_queue` is [Some].
batch_validator: Option<BatchValidator<P>>,
}

impl<P, F> BatchProvider<P, F>
where
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
F: L2ChainProvider + Clone + Debug,
{
/// Creates a new [BatchProvider] with the given configuration and previous stage.
pub const fn new(cfg: Arc<RollupConfig>, prev: P, provider: F) -> Self {
Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None }
}

/// Attempts to update the active stage of the mux.
pub(crate) fn attempt_update(&mut self) -> PipelineResult<()> {
let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?;
if let Some(prev) = self.prev.take() {
// On the first call to `attempt_update`, we need to determine the active stage to
// initialize the mux with.
if self.cfg.is_holocene_active(origin.timestamp) {
self.batch_validator = Some(BatchValidator::new(self.cfg.clone(), prev));
} else {
self.batch_queue =
Some(BatchQueue::new(self.cfg.clone(), prev, self.provider.clone()));
}
} else if self.batch_queue.is_some() && self.cfg.is_holocene_active(origin.timestamp) {
// If the batch queue is active and Holocene is also active, transition to the batch
// validator.
let batch_queue = self.batch_queue.take().expect("Must have batch queue");
let mut bv = BatchValidator::new(self.cfg.clone(), batch_queue.prev);
bv.l1_blocks = batch_queue.l1_blocks;
self.batch_validator = Some(bv);
} else if self.batch_validator.is_some() && !self.cfg.is_holocene_active(origin.timestamp) {
// If the batch validator is active, and Holocene is not active, it indicates an L1
// reorg around Holocene activation. Transition back to the batch queue
// until Holocene re-activates.
let batch_validator = self.batch_validator.take().expect("Must have batch validator");
let mut bq =
BatchQueue::new(self.cfg.clone(), batch_validator.prev, self.provider.clone());
bq.l1_blocks = batch_validator.l1_blocks;
self.batch_queue = Some(bq);
}
Ok(())
}
}

#[async_trait]
impl<P, F> OriginAdvancer for BatchProvider<P, F>
where
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
F: L2ChainProvider + Clone + Send + Debug,
{
async fn advance_origin(&mut self) -> PipelineResult<()> {
self.attempt_update()?;

if let Some(batch_validator) = self.batch_validator.as_mut() {
batch_validator.advance_origin().await
} else if let Some(batch_queue) = self.batch_queue.as_mut() {
batch_queue.advance_origin().await
} else {
Err(PipelineError::NotEnoughData.temp())
}
}
}

impl<P, F> OriginProvider for BatchProvider<P, F>
where
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug,
F: L2ChainProvider + Clone + Debug,
{
fn origin(&self) -> Option<BlockInfo> {
self.batch_validator.as_ref().map_or_else(
|| {
self.batch_queue.as_ref().map_or_else(
|| self.prev.as_ref().and_then(|prev| prev.origin()),
|batch_queue| batch_queue.origin(),
)
},
|batch_validator| batch_validator.origin(),
)
}
}

#[async_trait]
impl<P, F> SignalReceiver for BatchProvider<P, F>
where
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug,
F: L2ChainProvider + Clone + Send + Debug,
{
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
self.attempt_update()?;

if let Some(batch_validator) = self.batch_validator.as_mut() {
batch_validator.signal(signal).await
} else if let Some(batch_queue) = self.batch_queue.as_mut() {
batch_queue.signal(signal).await
} else {
Err(PipelineError::NotEnoughData.temp())
}
}
}

#[async_trait]
impl<P, F> AttributesProvider for BatchProvider<P, F>
where
P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send,
F: L2ChainProvider + Clone + Send + Debug,
{
fn is_last_in_span(&self) -> bool {
self.batch_validator.as_ref().map_or_else(
|| self.batch_queue.as_ref().map_or(false, |batch_queue| batch_queue.is_last_in_span()),
|batch_validator| batch_validator.is_last_in_span(),
)
}

async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult<SingleBatch> {
self.attempt_update()?;

if let Some(batch_validator) = self.batch_validator.as_mut() {
batch_validator.next_batch(parent).await
} else if let Some(batch_queue) = self.batch_queue.as_mut() {
batch_queue.next_batch(parent).await
} else {
Err(PipelineError::NotEnoughData.temp())
}
}
}

#[cfg(test)]
mod test {
use super::BatchProvider;
use crate::{
test_utils::{TestL2ChainProvider, TestNextBatchProvider},
traits::{OriginProvider, ResetSignal, SignalReceiver},
};
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::BlockInfo;
use std::sync::Arc;

#[test]
fn test_batch_provider_validator_active() {
let provider = TestNextBatchProvider::new(vec![]);
let l2_provider = TestL2ChainProvider::default();
let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() });
let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider);

assert!(batch_provider.attempt_update().is_ok());
assert!(batch_provider.prev.is_none());
assert!(batch_provider.batch_queue.is_none());
assert!(batch_provider.batch_validator.is_some());
}

#[test]
fn test_batch_provider_batch_queue_active() {
let provider = TestNextBatchProvider::new(vec![]);
let l2_provider = TestL2ChainProvider::default();
let cfg = Arc::new(RollupConfig::default());
let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider);

assert!(batch_provider.attempt_update().is_ok());
assert!(batch_provider.prev.is_none());
assert!(batch_provider.batch_queue.is_some());
assert!(batch_provider.batch_validator.is_none());
}

#[test]
fn test_batch_provider_transition_stage() {
let provider = TestNextBatchProvider::new(vec![]);
let l2_provider = TestL2ChainProvider::default();
let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() });
let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider);

batch_provider.attempt_update().unwrap();

// Update the L1 origin to Holocene activation.
let Some(ref mut stage) = batch_provider.batch_queue else {
panic!("Expected BatchQueue");
};
stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() });

// Transition to the BatchValidator stage.
batch_provider.attempt_update().unwrap();
assert!(batch_provider.batch_queue.is_none());
assert!(batch_provider.batch_validator.is_some());

assert_eq!(batch_provider.origin().unwrap().number, 1);
}

#[test]
fn test_batch_provider_transition_stage_backwards() {
let provider = TestNextBatchProvider::new(vec![]);
let l2_provider = TestL2ChainProvider::default();
let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() });
let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider);

batch_provider.attempt_update().unwrap();

// Update the L1 origin to Holocene activation.
let Some(ref mut stage) = batch_provider.batch_queue else {
panic!("Expected BatchQueue");
};
stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() });

// Transition to the BatchValidator stage.
batch_provider.attempt_update().unwrap();
assert!(batch_provider.batch_queue.is_none());
assert!(batch_provider.batch_validator.is_some());

// Update the L1 origin to before Holocene activation, to simulate a re-org.
let Some(ref mut stage) = batch_provider.batch_validator else {
panic!("Expected BatchValidator");
};
stage.prev.origin = Some(BlockInfo::default());

batch_provider.attempt_update().unwrap();
assert!(batch_provider.batch_queue.is_some());
assert!(batch_provider.batch_validator.is_none());
}

#[tokio::test]
async fn test_batch_provider_reset_bq() {
let provider = TestNextBatchProvider::new(vec![]);
let l2_provider = TestL2ChainProvider::default();
let cfg = Arc::new(RollupConfig::default());
let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider);

// Reset the batch provider.
batch_provider.signal(ResetSignal::default().signal()).await.unwrap();

let Some(bq) = batch_provider.batch_queue else {
panic!("Expected BatchQueue");
};
assert!(bq.l1_blocks.len() == 1);
}

#[tokio::test]
async fn test_batch_provider_reset_validator() {
let provider = TestNextBatchProvider::new(vec![]);
let l2_provider = TestL2ChainProvider::default();
let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() });
let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider);

// Reset the batch provider.
batch_provider.signal(ResetSignal::default().signal()).await.unwrap();

let Some(bv) = batch_provider.batch_validator else {
panic!("Expected BatchValidator");
};
assert!(bv.l1_blocks.len() == 1);
}
}
Loading

0 comments on commit 4e07ad2

Please sign in to comment.