Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(derive): Span Batch Validation #121

Merged
merged 6 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ where
/// Follows the validity rules imposed on consecutive batches.
/// Based on currently available buffered batch and L1 origin information.
/// A [StageError::Eof] is returned if no batch can be derived yet.
pub fn derive_next_batch(&mut self, empty: bool, parent: L2BlockInfo) -> StageResult<Batch> {
pub async fn derive_next_batch(
&mut self,
empty: bool,
parent: L2BlockInfo,
) -> StageResult<Batch> {
// Cannot derive a batch if no origin was prepared.
if self.l1_blocks.is_empty() {
return Err(StageError::MissingOrigin);
Expand Down Expand Up @@ -140,7 +144,8 @@ where
let mut remaining = Vec::new();
for i in 0..self.batches.len() {
let batch = &self.batches[i];
let validity = batch.check_batch(&self.cfg, &self.l1_blocks, parent, &self.fetcher);
let validity =
batch.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await;
match validity {
BatchValidity::Future => {
remaining.push(batch.clone());
Expand Down Expand Up @@ -221,15 +226,15 @@ where
}

/// Adds a batch to the queue.
pub fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
pub async fn add_batch(&mut self, batch: Batch, parent: L2BlockInfo) -> StageResult<()> {
if self.l1_blocks.is_empty() {
error!("Cannot add batch without an origin");
panic!("Cannot add batch without an origin");
}
let origin = self.origin.ok_or_else(|| anyhow!("cannot add batch with missing origin"))?;
let data = BatchWithInclusionBlock { inclusion_block: origin, batch };
// If we drop the batch, validation logs the drop reason with WARN level.
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &self.fetcher).is_drop() {
if data.check_batch(&self.cfg, &self.l1_blocks, parent, &mut self.fetcher).await.is_drop() {
return Ok(());
}
self.batches.push(data);
Expand Down Expand Up @@ -310,7 +315,7 @@ where
match self.prev.next_batch().await {
Ok(b) => {
if !origin_behind {
self.add_batch(b, parent).ok();
self.add_batch(b, parent).await.ok();
} else {
warn!("Dropping batch: Origin is behind");
}
Expand All @@ -329,7 +334,7 @@ where
}

// Attempt to derive more batches.
let batch = match self.derive_next_batch(out_of_data, parent) {
let batch = match self.derive_next_batch(out_of_data, parent).await {
Ok(b) => b,
Err(e) => match e {
StageError::Eof => {
Expand Down Expand Up @@ -418,15 +423,15 @@ mod tests {
BatchReader::from(compressed)
}

#[test]
fn test_derive_next_batch_missing_origin() {
#[tokio::test]
async fn test_derive_next_batch_missing_origin() {
let data = vec![Ok(Batch::Single(SingleBatch::default()))];
let cfg = Arc::new(RollupConfig::default());
let mock = MockBatchQueueProvider::new(data);
let fetcher = MockBlockFetcher::default();
let mut bq = BatchQueue::new(cfg, mock, fetcher);
let parent = L2BlockInfo::default();
let result = bq.derive_next_batch(false, parent).unwrap_err();
let result = bq.derive_next_batch(false, parent).await.unwrap_err();
assert_eq!(result, StageError::MissingOrigin);
}

Expand Down
26 changes: 24 additions & 2 deletions crates/derive/src/stages/test_utils/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,29 @@ use tracing::{Event, Level, Subscriber};
use tracing_subscriber::{layer::Context, Layer};

/// The storage for the collected traces.
pub type TraceStorage = Arc<Mutex<Vec<(Level, String)>>>;
#[derive(Debug, Default, Clone)]
pub struct TraceStorage(pub Arc<Mutex<Vec<(Level, String)>>>);

impl TraceStorage {
/// Returns the items in the storage that match the specified level.
pub fn get_by_level(&self, level: Level) -> Vec<String> {
self.0
.lock()
.iter()
.filter_map(|(l, message)| if *l == level { Some(message.clone()) } else { None })
.collect()
}

/// Locks the storage and returns the items.
pub fn lock(&self) -> spin::MutexGuard<'_, Vec<(Level, String)>> {
self.0.lock()
}

/// Returns if the storage is empty.
pub fn is_empty(&self) -> bool {
self.0.lock().is_empty()
}
}

#[derive(Debug, Default)]
pub struct CollectingLayer {
Expand All @@ -26,7 +48,7 @@ impl<S: Subscriber> Layer<S> for CollectingLayer {
let level = *metadata.level();
let message = format!("{:?}", event);

let mut storage = self.storage.lock();
let mut storage = self.storage.0.lock();
storage.push((level, message));
}
}
8 changes: 5 additions & 3 deletions crates/derive/src/types/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,21 @@ impl BatchWithInclusionBlock {
/// One or more consecutive l1_blocks should be provided.
/// In case of only a single L1 block, the decision whether a batch is valid may have to stay
/// undecided.
pub fn check_batch<BF: L2ChainProvider>(
pub async fn check_batch<BF: L2ChainProvider>(
&self,
cfg: &RollupConfig,
l1_blocks: &[BlockInfo],
l2_safe_head: L2BlockInfo,
fetcher: &BF,
fetcher: &mut BF,
) -> BatchValidity {
match &self.batch {
Batch::Single(single_batch) => {
single_batch.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block)
}
Batch::Span(span_batch) => {
span_batch.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block, fetcher)
span_batch
.check_batch(cfg, l1_blocks, l2_safe_head, &self.inclusion_block, fetcher)
.await
}
}
}
Expand Down
Loading
Loading