Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downloading & parsing blobs #75

Merged
merged 26 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8059330
added blobs URL to options
vbar Mar 14, 2024
1d6cff4
downloading blobs
vbar Mar 14, 2024
84fdf7c
Merge branch 'main' into feat/blob-resolve
vbar Mar 18, 2024
6d123be
updated dependencies
vbar Mar 18, 2024
95030aa
finished parse_pubdata_from_blobs
vbar Mar 18, 2024
c95727f
silenced clippy
vbar Mar 18, 2024
7e37172
handling zeros missing at end of compressed value
vbar Mar 18, 2024
6c21352
parsing blobs as either hex, of comma-separated decimal lists
vbar Mar 18, 2024
7340fa0
Revert "parsing blobs as either hex, of comma-separated decimal lists"
vbar Mar 19, 2024
177d7e1
Update comment
vbar Mar 20, 2024
85d4138
encapsulated BlobHttpClient
vbar Mar 20, 2024
4541901
removed parse_pubdata_from_blobs pointer argument
vbar Mar 20, 2024
da19054
renamed parse_pubdata_from_calldata
vbar Mar 20, 2024
2714dc1
removed parse_resolved_pubdata pointer argument
vbar Mar 20, 2024
0d54a35
unified try_from for v? comment
vbar Mar 20, 2024
705193c
unified parse_total_l2_to_l1_pubdata with parse_resolved_pubdata
vbar Mar 20, 2024
f57f69e
logging BlobFormatError just once
vbar Mar 20, 2024
af59397
comment update
vbar Mar 20, 2024
fc72f40
simpler conversion to slice
vbar Mar 20, 2024
21a1a2a
added comment
vbar Mar 20, 2024
3b25ad0
moved get_blob to BlobHttpClient
vbar Mar 20, 2024
00b097a
less indentation in get_blob_data
vbar Mar 20, 2024
e06c60d
comment w/ link
vbar Mar 20, 2024
310de5d
comment w/ link
vbar Mar 20, 2024
1031503
comment w/ link
vbar Mar 20, 2024
23a7563
typed JSON parsing
vbar Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use clap::{Args, Parser, Subcommand, ValueEnum};
use state_reconstruct_fetcher::constants::ethereum;
use state_reconstruct_fetcher::{
constants::ethereum, l1_fetcher::L1FetcherOptions as FetcherOptions,
};

use crate::processor::snapshot;

Expand All @@ -8,6 +10,9 @@ pub struct L1FetcherOptions {
/// The Ethereum JSON-RPC HTTP URL to use.
#[arg(long)]
pub http_url: String,
/// The Ethereum blob storage URL base.
#[arg(long, default_value_t = ethereum::BLOBS_URL.to_string())]
pub blobs_url: String,
/// Ethereum block number to start state import from.
#[arg(short, long, default_value_t = ethereum::GENESIS_BLOCK)]
pub start_block: u64,
Expand All @@ -22,6 +27,20 @@ pub struct L1FetcherOptions {
pub disable_polling: bool,
}

/// Allow conversion into `l1_fetcher::L1FetcherOptions`, for use at lower level.
impl From<L1FetcherOptions> for FetcherOptions {
fn from(opt: L1FetcherOptions) -> Self {
FetcherOptions {
http_url: opt.http_url,
blobs_url: opt.blobs_url,
start_block: opt.start_block,
block_step: opt.block_step,
block_count: opt.block_count,
disable_polling: opt.disable_polling,
}
}
}

#[derive(Subcommand)]
pub enum ReconstructSource {
/// Fetch data from L1.
Expand Down
33 changes: 4 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::{SnapshotBuilder, SnapshotExporter};
use state_reconstruct_fetcher::{
constants::storage,
l1_fetcher::{L1Fetcher, L1FetcherOptions},
types::CommitBlock,
};
use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock};
use tikv_jemallocator::Jemalloc;
use tokio::sync::mpsc;
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
Expand Down Expand Up @@ -71,14 +67,7 @@ async fn main() -> Result<()> {

match source {
ReconstructSource::L1 { l1_fetcher_options } => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher_options = l1_fetcher_options.into();
let processor = TreeProcessor::new(db_path.clone()).await?;
let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_snapshot()))?;
let (tx, rx) = mpsc::channel::<CommitBlock>(5);
Expand Down Expand Up @@ -114,14 +103,7 @@ async fn main() -> Result<()> {
l1_fetcher_options,
file,
} => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher_options = l1_fetcher_options.into();
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = JsonSerializationProcessor::new(Path::new(&file))?;
let (tx, rx) = mpsc::channel::<CommitBlock>(5);
Expand Down Expand Up @@ -158,14 +140,7 @@ async fn main() -> Result<()> {
l1_fetcher_options,
db_path,
} => {
let fetcher_options = L1FetcherOptions {
http_url: l1_fetcher_options.http_url,
start_block: l1_fetcher_options.start_block,
block_step: l1_fetcher_options.block_step,
block_count: l1_fetcher_options.block_count,
disable_polling: l1_fetcher_options.disable_polling,
};

let fetcher_options = l1_fetcher_options.into();
let fetcher = L1Fetcher::new(fetcher_options, None)?;
let processor = SnapshotBuilder::new(db_path);

Expand Down
2 changes: 2 additions & 0 deletions state-reconstruct-fetcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ ethers = "1.0.2"
eyre = "0.6.8"
indexmap = { version = "2.0.2", features = ["serde"] }
rand = "0.8.5"
reqwest = "0.11.24"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = { version = "1.0.107", features = ["std"] }
serde_json_any_key = "2.0.0"
Expand All @@ -20,3 +21,4 @@ tracing = "0.1.40"
rocksdb = "0.21.0"
hex = "0.4.3"
chrono = "0.4.31"
zkevm_circuits = { git = "https://github.com/matter-labs/era-zkevm_circuits", branch = "v1.4.1" }
107 changes: 107 additions & 0 deletions state-reconstruct-fetcher/src/blob_http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use serde_json::Value;
use tokio::time::{sleep, Duration};

use crate::types::ParseError;

/// `MAX_RETRIES` is the maximum number of retries on failed blob retrieval.
const MAX_RETRIES: u8 = 5;
/// The interval in seconds to wait before retrying to fetch a blob.
const FAILED_FETCH_RETRY_INTERVAL_S: u64 = 10;

pub struct BlobHttpClient {
client: reqwest::Client,
url_base: String,
}

impl BlobHttpClient {
pub fn new(blob_url: String) -> eyre::Result<Self> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"Accept",
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;
Ok(Self {
client,
url_base: blob_url,
})
}

pub async fn get_blob(&self, kzg_commitment: &[u8]) -> Result<Vec<u8>, ParseError> {
let url = self.format_url(kzg_commitment);
for attempt in 1..=MAX_RETRIES {
match self.retrieve_url(&url).await {
Ok(response) => match response.text().await {
Ok(text) => match get_blob_data(&text) {
Ok(data) => {
let plain = if let Some(p) = data.strip_prefix("0x") {
p
} else {
&data
};
return hex::decode(plain).map_err(|e| {
ParseError::BlobFormatError(plain.to_string(), e.to_string())
});
}
Err(e) => {
tracing::error!("failed parsing response of {url}");
return Err(e);
}
},
Err(e) => {
tracing::error!("attempt {}: {} failed: {:?}", attempt, url, e);
sleep(Duration::from_secs(FAILED_FETCH_RETRY_INTERVAL_S)).await;
}
},
Comment on lines +41 to +62
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Ok(response) => match response.text().await {
Ok(text) => match get_blob_data(&text) {
Ok(data) => {
let plain = if let Some(p) = data.strip_prefix("0x") {
p
} else {
&data
};
return hex::decode(plain).map_err(|e| {
ParseError::BlobFormatError(plain.to_string(), e.to_string())
});
}
Err(e) => {
tracing::error!("failed parsing response of {url}");
return Err(e);
}
},
Err(e) => {
tracing::error!("attempt {}: {} failed: {:?}", attempt, url, e);
sleep(Duration::from_secs(FAILED_FETCH_RETRY_INTERVAL_S)).await;
}
},
Ok(response) => {
let Ok(text) = response.text().await else {
continue;
};
let data = get_blob_data(&text)?;
let plain = if let Some(p) = data.strip_prefix("0x") {
p
} else {
&data
};
return hex::decode(plain)
.map_err(|e| ParseError::BlobFormatError(plain.to_string(), e.to_string())));
}

How does this look? It accomplishes pretty much the same thing as it did previously, but in my opinion it's a lot more readable this way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it isn't the same thing at all... In case response cannot be retrieved, I want to see the error (I don't think I ever did - I suppose it's there for some complicated HTTP stuff that doesn't really happen, but who knows), not just try again...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will give you the error if the response fails though, that part hasn't been changed at all.

However, another thing I thought of: we should actually be getting the response as a json directly instead of getting it as a string and then parsing it as one.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will give you the error if the response fails though, that part hasn't been changed at all.

Well, no. Let's say I replace the Ok(text) = response.text() with Err(x) = response.text(), to simulate the failure (full executable code is in https://github.com/eqlabs/zksync-state-reconstruct/tree/inversion ). Then I can run cargo run reconstruct l1 --http-url https://eth.llamarpc.com --start-block 19428415 to check some blobs, and get

2024-03-21T06:57:31.341353Z  INFO url = https://api.blobscan.com/blobs/0xb1647613a7b52bae4aa8f359421d1c9e8144d62768ac00946afb81838d24a3f76172fdfd01ab09f5bfa2e1638954e513
2024-03-21T06:57:32.394116Z  INFO attempt 1: got response
2024-03-21T06:57:33.340344Z  INFO attempt 2: got response
2024-03-21T06:57:34.429335Z  INFO attempt 3: got response
2024-03-21T06:57:35.041117Z  INFO attempt 4: got response
2024-03-21T06:57:35.635881Z  INFO attempt 5: got response
2024-03-21T06:57:40.777653Z  INFO PROGRESS: [ 0%] CUR BLOCK L1: 19428483 L2: 460962 TOTAL BLOCKS PROCESSED L1: 68 L2: 0

No error is logged, and really I don't see how it could...

Copy link
Collaborator Author

@vbar vbar Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, another thing I thought of: we should actually be getting the response as a json directly instead of getting it as a string and then parsing it as one.

That link is for sending JSON, but yes, something like https://docs.rs/reqwest/latest/reqwest/struct.Response.html#method.json should be possible - except I don't see how to get the response text after failing to parse it as JSON... Really, all I want here is error handling - get response, report error on failure, then parse it, report error on failure... If that's more elaborate than some online Rust examples, well, so what?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code is testing that we can decode the response, not get the response. Getting the response is still handled with a match statement like it was previously.

I'll admit, I don't really see why we would need that level of error logging. Failing to parse it as JSON is in my mind more than good enough, since that tells the user exactly what went wrong - it's not a valid JSON response.

And for the error handling part, if you're insistent, I would highly recommend looking into more advanced thiserror uses, like the #[from] derive. That allows you to do what you want, without polluting the code with so much boilerplate. If you go down this road, I'd also suggest making BlobError it's own error type that can then be condensed to a ParseError.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 async calls needed to get the response: send (inside BlobHttpClient::retrieve_url) and text. Both can fail, and both should be tested. I want to log the second one, precisely because I do not expect the error to happen.

I'll admit, I don't really see why we would need that level of error logging.

Well, to debug the problems that we've already had (e.g. the wrong URL returning JSON without a data attribute - looking at it, it's obvious it says NOT_FOUND, but a serde parser will loose that info).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, sure. in that case we should definitely be using the ? operator in combination with thiserror-style error handling then since that would be an unrecoverable error. See my comment above for details.

Err(e) => {
tracing::error!("attempt {}: GET {} failed: {:?}", attempt, url, e);
sleep(Duration::from_secs(FAILED_FETCH_RETRY_INTERVAL_S)).await;
}
}
}
Err(ParseError::BlobStorageError(url))
}

fn format_url(&self, kzg_commitment: &[u8]) -> String {
format!("{}0x{}", self.url_base, hex::encode(kzg_commitment))
}

async fn retrieve_url(&self, url: &str) -> eyre::Result<reqwest::Response> {
let result = self.client.get(url).send().await?;
Ok(result)
}
}

fn get_blob_data(json_str: &str) -> Result<String, ParseError> {
let Ok(v) = serde_json::from_str(json_str) else {
return Err(ParseError::BlobFormatError(
json_str.to_string(),
"not JSON".to_string(),
));
};

let Value::Object(m) = v else {
return Err(ParseError::BlobFormatError(
json_str.to_string(),
"data is not object".to_string(),
));
};

let Some(d) = m.get("data") else {
return Err(ParseError::BlobFormatError(
json_str.to_string(),
"no data in response".to_string(),
));
};

let Value::String(s) = d else {
return Err(ParseError::BlobFormatError(
json_str.to_string(),
"data is not string".to_string(),
));
};

Ok(s.clone())
}
7 changes: 7 additions & 0 deletions state-reconstruct-fetcher/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub mod ethereum {

/// zkSync smart contract address.
pub const ZK_SYNC_ADDR: &str = "0x32400084C286CF3E17e7B677ea9583e60a000324";

/// Default Ethereum blob storage URL base.
pub const BLOBS_URL: &str = "https://api.blobscan.com/blobs/";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should supply a default URL at all. With the ETH-RPC URL, we made the decision to only give one as an example in the README, as opposed to hard-coding in a default one.

Of course this will depend on if blob providers can be counted on to use compatible formats. But, if it's possible, I'd prefer not tying ourselves to one specific provider.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't tying ourselves - it's configurable... We're just being pushed, to the only blob archive there is... Anyway, this URL isn't directly analogical to the ETH-RPC URL, because it isn't always needed - if the user reconstructs only old blocks, it isn't necessary...

}

pub mod storage {
Expand All @@ -33,4 +36,8 @@ pub mod zksync {
pub const OPERATION_BITMASK: u8 = 7;
// The number of bits shifting the compressed state diff metadata by which we retrieve its length.
pub const LENGTH_BITS_OFFSET: u8 = 3;
// Size of `CommitBatchInfo.pubdataCommitments` item.
pub const PUBDATA_COMMITMENT_SIZE: usize = 144;
// The number of trailing bytes to ignore when using calldata post-blobs. Contains unused blob commitments.
pub const CALLDATA_SOURCE_TAIL_SIZE: usize = 32;
}
Loading