Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(derive): Typed error handling #540

Merged
merged 12 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.ci.junit]
path = "junit.xml"
5 changes: 3 additions & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ coverage:

ignore:
- "tests"
- "test_util*"
- "test_utils"
- "test_util.rs"
- "test_utils.rs"
- "crates/derive/src/stages/test_utils"
- "bin/"

# Make comments less noisy
Expand Down
11 changes: 9 additions & 2 deletions .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ jobs:
if: hashFiles('Cargo.lock') == ''
run: cargo generate-lockfile
- name: cargo llvm-cov
run: cargo llvm-cov nextest --locked --workspace --lcov --output-path lcov.info --features test-utils
run: |
cargo llvm-cov nextest --locked --workspace --lcov --output-path lcov.info --features test-utils --profile ci && \
mv ./target/nextest/ci/junit.xml ./junit.xml
- name: Record Rust version
run: echo "RUST=$(rustc --version)" >> "$GITHUB_ENV"
- name: Upload to codecov.io
- name: Upload coverage to codecov.io
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
env_vars: OS,RUST
files: lcov.info
- name: Upload test results to codecov.io
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
42 changes: 31 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ kona-primitives = { path = "crates/primitives", version = "0.0.2", default-featu

# General
anyhow = { version = "1.0.86", default-features = false }
thiserror = { git = "https://github.com/quartiq/thiserror", branch = "no-std", default-features = false }
cfg-if = "1.0.0"
hashbrown = "0.14.5"
spin = { version = "0.9.8", features = ["mutex"] }
Expand Down
13 changes: 6 additions & 7 deletions bin/client/src/l1/blob_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_consensus::Blob;
use alloy_eips::eip4844::FIELD_ELEMENTS_PER_BLOB;
use alloy_primitives::keccak256;
use anyhow::Result;
use async_trait::async_trait;
use kona_derive::{errors::BlobProviderError, traits::BlobProvider};
use kona_derive::traits::BlobProvider;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
use kona_primitives::IndexedBlobHash;
use op_alloy_protocol::BlockInfo;
Expand All @@ -32,11 +33,7 @@ impl<T: CommsClient> OracleBlobProvider<T> {
/// ## Returns
/// - `Ok(blob)`: The blob.
/// - `Err(e)`: The blob could not be retrieved.
async fn get_blob(
&self,
block_ref: &BlockInfo,
blob_hash: &IndexedBlobHash,
) -> Result<Blob, BlobProviderError> {
async fn get_blob(&self, block_ref: &BlockInfo, blob_hash: &IndexedBlobHash) -> Result<Blob> {
let mut blob_req_meta = [0u8; 48];
blob_req_meta[0..32].copy_from_slice(blob_hash.hash.as_ref());
blob_req_meta[32..40].copy_from_slice((blob_hash.index as u64).to_be_bytes().as_ref());
Expand Down Expand Up @@ -76,11 +73,13 @@ impl<T: CommsClient> OracleBlobProvider<T> {

#[async_trait]
impl<T: CommsClient + Sync + Send> BlobProvider for OracleBlobProvider<T> {
type Error = anyhow::Error;

async fn get_blobs(
&mut self,
block_ref: &BlockInfo,
blob_hashes: &[IndexedBlobHash],
) -> Result<Vec<Blob>, BlobProviderError> {
) -> Result<Vec<Blob>, Self::Error> {
let mut blobs = Vec::with_capacity(blob_hashes.len());
for hash in blob_hashes {
blobs.push(self.get_blob(block_ref, hash).await?);
Expand Down
2 changes: 2 additions & 0 deletions bin/client/src/l1/chain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl<T: CommsClient> OracleL1ChainProvider<T> {

#[async_trait]
impl<T: CommsClient + Sync + Send> ChainProvider for OracleL1ChainProvider<T> {
type Error = anyhow::Error;

async fn header_by_hash(&mut self, hash: B256) -> Result<Header> {
// Send a hint for the block header.
self.oracle.write(&HintType::L1BlockHeader.encode_with(&[hash.as_ref()])).await?;
Expand Down
21 changes: 17 additions & 4 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use alloy_consensus::{Header, Sealed};
use anyhow::{anyhow, Result};
use core::fmt::Debug;
use kona_derive::{
errors::StageError,
errors::PipelineErrorKind,
pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult},
sources::EthereumDataSource,
stages::{
AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue, L1Retrieval,
L1Traversal, StatefulAttributesBuilder,
},
traits::{BlobProvider, ChainProvider, L2ChainProvider},
traits::{BlobProvider, ChainProvider, L2ChainProvider, OriginProvider},
};
use kona_mpt::TrieDBFetcher;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
Expand Down Expand Up @@ -171,8 +171,21 @@ where
// Break the loop unless the error signifies that there is not enough data to
// complete the current step. In this case, we retry the step to see if other
// stages can make progress.
if !matches!(e, StageError::NotEnoughData | StageError::Temporary(_)) {
break;
match e {
PipelineErrorKind::Temporary(_) => { /* continue */ }
PipelineErrorKind::Reset(_) => {
// Reset the pipeline to the initial L2 safe head and L1 origin,
// and try again.
self.pipeline
.reset(
self.l2_safe_head.block_info,
self.pipeline
.origin()
.ok_or_else(|| anyhow!("Missing L1 origin"))?,
)
.await?;
}
PipelineErrorKind::Critical(_) => return Err(e.into()),
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions bin/client/src/l2/chain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ impl<T: CommsClient> OracleL2ChainProvider<T> {

#[async_trait]
impl<T: CommsClient + Send + Sync> L2ChainProvider for OracleL2ChainProvider<T> {
type Error = anyhow::Error;

async fn l2_block_info_by_number(&mut self, number: u64) -> Result<L2BlockInfo> {
// Get the payload at the given block number.
let payload = self.payload_by_number(number).await?;

// Construct the system config from the payload.
payload.to_l2_block_ref(&self.boot_info.rollup_config)
payload.to_l2_block_ref(&self.boot_info.rollup_config).map_err(Into::into)
}

async fn payload_by_number(&mut self, number: u64) -> Result<L2ExecutionPayloadEnvelope> {
Expand Down Expand Up @@ -114,7 +116,7 @@ impl<T: CommsClient + Send + Sync> L2ChainProvider for OracleL2ChainProvider<T>
let payload = self.payload_by_number(number).await?;

// Construct the system config from the payload.
payload.to_system_config(rollup_config.as_ref())
payload.to_system_config(rollup_config.as_ref()).map_err(Into::into)
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ unsigned-varint.workspace = true
miniz_oxide.workspace = true
brotli.workspace = true
alloc-no-stdlib.workspace = true
anyhow.workspace = true
thiserror.workspace = true
tracing.workspace = true
async-trait.workspace = true

Expand All @@ -52,9 +52,11 @@ alloy-rpc-client = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, optional = true }
alloy-node-bindings = { workspace = true, optional = true }
alloy-transport-http = { workspace = true, optional = true }
anyhow = { workspace = true, optional = true }

[dev-dependencies]
spin.workspace = true
anyhow.workspace = true
alloy-rpc-client.workspace = true
alloy-transport-http.workspace = true
tokio.workspace = true
Expand Down Expand Up @@ -91,6 +93,7 @@ online = [
]
test-utils = [
"dep:spin",
"dep:anyhow",
"dep:alloy-transport-http",
"dep:alloy-node-bindings",
"dep:tracing-subscriber",
Expand Down
14 changes: 7 additions & 7 deletions crates/derive/src/batch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use op_alloy_genesis::RollupConfig;
use op_alloy_protocol::{BlockInfo, L2BlockInfo};

use crate::{errors::DecodeError, traits::L2ChainProvider};
use crate::{errors::PipelineEncodingError, traits::L2ChainProvider};

mod batch_type;
pub use batch_type::BatchType;
Expand Down Expand Up @@ -79,9 +79,9 @@
}

/// Attempts to decode a batch from a reader.
pub fn decode(r: &mut &[u8], cfg: &RollupConfig) -> Result<Self, DecodeError> {
pub fn decode(r: &mut &[u8], cfg: &RollupConfig) -> Result<Self, PipelineEncodingError> {
if r.is_empty() {
return Err(DecodeError::EmptyBuffer);
return Err(PipelineEncodingError::EmptyBuffer);

Check warning on line 84 in crates/derive/src/batch/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/batch/mod.rs#L84

Added line #L84 was not covered by tests
}

// Read the batch type
Expand All @@ -90,15 +90,15 @@

match batch_type {
BatchType::Single => {
let single_batch = SingleBatch::decode(r)?;
let single_batch =
SingleBatch::decode(r).map_err(PipelineEncodingError::AlloyRlpError)?;

Check warning on line 94 in crates/derive/src/batch/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/derive/src/batch/mod.rs#L93-L94

Added lines #L93 - L94 were not covered by tests
Ok(Batch::Single(single_batch))
}
BatchType::Span => {
let mut raw_span_batch =
RawSpanBatch::decode(r, cfg).map_err(DecodeError::SpanBatchError)?;
let mut raw_span_batch = RawSpanBatch::decode(r, cfg)?;
let span_batch = raw_span_batch
.derive(cfg.block_time, cfg.genesis.l2_time, cfg.l2_chain_id)
.map_err(DecodeError::SpanBatchError)?;
.map_err(PipelineEncodingError::SpanBatchError)?;
Ok(Batch::Span(span_batch))
}
}
Expand Down
4 changes: 1 addition & 3 deletions crates/derive/src/batch/span_batch/bits.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
//! Module for working with span batch bits.

use super::{errors::SpanBatchError, FJORD_MAX_SPAN_BATCH_BYTES, MAX_SPAN_BATCH_BYTES};
use alloc::{vec, vec::Vec};
use alloy_rlp::Buf;
use anyhow::Result;
use core::cmp::Ordering;

use super::{errors::SpanBatchError, FJORD_MAX_SPAN_BATCH_BYTES, MAX_SPAN_BATCH_BYTES};

/// Type for span batch bits.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct SpanBatchBits(pub Vec<u8>);
Expand Down
Loading