Skip to content

Commit

Permalink
feat(derive): Holocene Buffer Flushing (#575)
Browse files Browse the repository at this point in the history
* feat(derive): wire up the batch span stage (#567)

* fix: missing flush
  • Loading branch information
refcell authored Sep 27, 2024
1 parent 2002e33 commit ccd3f14
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 0 deletions.
8 changes: 8 additions & 0 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub trait BatchQueueProvider {
///
/// [ChannelReader]: crate::stages::ChannelReader
async fn next_batch(&mut self) -> PipelineResult<Batch>;

/// Allows the [BatchQueue] to flush the buffer in the [crate::stages::BatchStream]
/// if an invalid single batch is found. Pre-holocene hardfork, this will be a no-op.
fn flush(&mut self);
}

/// [BatchQueue] is responsible for o rdering unordered batches
Expand Down Expand Up @@ -146,6 +150,9 @@ where
remaining.push(batch.clone());
}
BatchValidity::Drop => {
// If we drop a batch, flush previous batches buffered in the BatchStream
// stage.
self.prev.flush();
warn!(target: "batch-queue", "Dropping batch with parent: {}", parent.block_info);
continue;
}
Expand Down Expand Up @@ -233,6 +240,7 @@ where
let data = BatchWithInclusionBlock { inclusion_block: origin, batch };
// If we drop the batch, validation logs the drop reason with WARN level.
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await.is_drop() {
self.prev.flush();
return Ok(());
}
self.batches.push(data);
Expand Down
6 changes: 6 additions & 0 deletions crates/derive/src/stages/batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ impl<P> BatchQueueProvider for BatchStream<P>
where
P: BatchQueueProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
fn flush(&mut self) {
if self.is_active().unwrap_or(false) {
self.buffer.clear();
}
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
// If the stage is not active, "pass" the next batch
// through this stage to the BatchQueue stage.
Expand Down
3 changes: 3 additions & 0 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ impl<P> BatchQueueProvider for ChannelReader<P>
where
P: ChannelReaderProvider + OriginAdvancer + OriginProvider + ResettableStage + Send + Debug,
{
fn flush(&mut self) { /* noop */
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
crate::timer!(START, STAGE_ADVANCE_RESPONSE_TIME, &["channel_reader"], timer);
if let Err(e) = self.set_batch_reader().await {
Expand Down
3 changes: 3 additions & 0 deletions crates/derive/src/stages/test_utils/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ impl OriginProvider for MockBatchQueueProvider {

#[async_trait]
impl BatchQueueProvider for MockBatchQueueProvider {
fn flush(&mut self) { /* noop */
}

async fn next_batch(&mut self) -> PipelineResult<Batch> {
self.batches.pop().ok_or(PipelineError::Eof.temp())?
}
Expand Down

0 comments on commit ccd3f14

Please sign in to comment.