diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 70d4db4c7..30b462b2c 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -16,8 +16,8 @@ use kona_derive::{ pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ - AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, - L1Traversal, + AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, + L1Retrieval, L1Traversal, }, traits::{BlobProvider, ChainProvider, L2ChainProvider, OriginProvider}, }; @@ -46,8 +46,10 @@ pub type OracleAttributesBuilder = /// An oracle-backed attributes queue for the derivation pipeline. pub type OracleAttributesQueue = AttributesQueue< BatchQueue< - ChannelReader< - ChannelBank>>>>, + BatchStream< + ChannelReader< + ChannelBank>>>>, + >, >, OracleL2ChainProvider, >, diff --git a/crates/derive/src/online/pipeline.rs b/crates/derive/src/online/pipeline.rs index 9213c01d4..4eb634400 100644 --- a/crates/derive/src/online/pipeline.rs +++ b/crates/derive/src/online/pipeline.rs @@ -11,7 +11,8 @@ use op_alloy_protocol::BlockInfo; // Pipeline internal stages aren't re-exported at the module-level. use crate::stages::{ - AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, + AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, }; /// An online derivation pipeline. @@ -32,7 +33,11 @@ pub type OnlineAttributesBuilder = /// An `online` attributes queue for the derivation pipeline. pub type OnlineAttributesQueue = AttributesQueue< BatchQueue< - ChannelReader>>>>, + BatchStream< + ChannelReader< + ChannelBank>>>, + >, + >, AlloyL2ChainProvider, >, OnlineAttributesBuilder, diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 2307a8a30..837d72358 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -4,7 +4,8 @@ use super::{ AttributesBuilder, ChainProvider, DataAvailabilityProvider, DerivationPipeline, L2ChainProvider, }; use crate::stages::{ - AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, + AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, }; use alloc::sync::Arc; use core::fmt::Debug; @@ -16,7 +17,8 @@ type L1RetrievalStage = L1Retrieval>; type FrameQueueStage = FrameQueue>; type ChannelBankStage = ChannelBank>; type ChannelReaderStage = ChannelReader>; -type BatchQueueStage = BatchQueue, T>; +type BatchStreamStage = BatchStream>; +type BatchQueueStage = BatchQueue, T>; type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. @@ -132,8 +134,9 @@ where let frame_queue = FrameQueue::new(l1_retrieval); let channel_bank = ChannelBank::new(Arc::clone(&rollup_config), frame_queue); let channel_reader = ChannelReader::new(channel_bank, Arc::clone(&rollup_config)); + let batch_span = BatchStream::new(channel_reader, rollup_config.clone()); let batch_queue = - BatchQueue::new(rollup_config.clone(), channel_reader, l2_chain_provider.clone()); + BatchQueue::new(rollup_config.clone(), batch_span, l2_chain_provider.clone()); let attributes = AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder);