From 5589bcaaf49dec793e53bb6730b5786548cbdefd Mon Sep 17 00:00:00 2001 From: Austin Abell Date: Wed, 22 May 2024 17:23:08 -0400 Subject: [PATCH] Support multiple blocks through zeth CLI --- .vscode/settings.json | 1 + host/src/main.rs | 46 ++++++++------- host/src/operations/build.rs | 107 ++++++++++++++++++++++++++++------- primitives/src/trie/mpt.rs | 49 +++++++++------- 4 files changed, 143 insertions(+), 60 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 1f1695e6..e1f95ca7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,6 +6,7 @@ "./guests/op-block/Cargo.toml", "./guests/op-compose/Cargo.toml", "./guests/op-derive/Cargo.toml", + "./host/Cargo.toml", "./testing/ef-tests/testguest/Cargo.toml" ] } \ No newline at end of file diff --git a/host/src/main.rs b/host/src/main.rs index 48d2a516..5f34fa96 100644 --- a/host/src/main.rs +++ b/host/src/main.rs @@ -14,6 +14,8 @@ extern crate core; +use std::sync::Arc; + use anyhow::Result; use clap::Parser; use log::info; @@ -31,7 +33,7 @@ use zeth_lib::{ #[tokio::main] async fn main() -> Result<()> { env_logger::init(); - let cli = Cli::parse(); + let cli = Arc::new(Cli::parse()); info!("Using the following image ids:"); info!(" eth-block: {}", Digest::from(ETH_BLOCK_ID)); @@ -41,15 +43,15 @@ async fn main() -> Result<()> { // execute the command let build_args = cli.build_args(); - let (image_id, stark) = match build_args.network { + let (image_id, starks) = match build_args.network { Network::Ethereum => { let rpc_url = build_args.eth_rpc_url.clone(); ( ETH_BLOCK_ID, build::build_block::( - &cli, + cli.clone(), rpc_url, - Ð_MAINNET_CHAIN_SPEC, + Arc::new(ETH_MAINNET_CHAIN_SPEC.clone()), ETH_BLOCK_ELF, ) .await?, @@ -60,9 +62,9 @@ async fn main() -> Result<()> { ( OP_BLOCK_ID, build::build_block::( - &cli, + cli.clone(), rpc_url, - &OP_MAINNET_CHAIN_SPEC, + Arc::new(OP_MAINNET_CHAIN_SPEC.clone()), OP_BLOCK_ELF, ) .await?, @@ -72,30 +74,36 @@ async fn main() -> Result<()> { if let Some(composition_size) = build_args.composition { ( OP_COMPOSE_ID, - rollups::compose_derived_rollup_blocks(&cli, composition_size).await?, + vec![rollups::compose_derived_rollup_blocks(&cli, composition_size).await?], ) } else { - (OP_DERIVE_ID, rollups::derive_rollup_blocks(&cli).await?) + ( + OP_DERIVE_ID, + vec![rollups::derive_rollup_blocks(&cli).await?], + ) } } }; // Create/verify Groth16 SNARK - if cli.snark() { - let Some((stark_uuid, stark_receipt)) = stark else { - panic!("No STARK data to snarkify!"); - }; + for stark in starks { + if cli.snark() { + let Some((stark_uuid, stark_receipt)) = stark else { + panic!("No STARK data to snarkify!"); + }; - if !cli.submit_to_bonsai() { - panic!("Bonsai submission flag required to create a SNARK!"); - } + if !cli.submit_to_bonsai() { + panic!("Bonsai submission flag required to create a SNARK!"); + } - let image_id = Digest::from(image_id); - let (snark_uuid, snark_receipt) = stark2snark(image_id, stark_uuid, stark_receipt).await?; + let image_id = Digest::from(image_id); + let (snark_uuid, snark_receipt) = + stark2snark(image_id, stark_uuid, stark_receipt).await?; - info!("Validating SNARK uuid: {}", snark_uuid); + info!("Validating SNARK uuid: {}", snark_uuid); - verify_groth16_snark(&cli, image_id, snark_receipt).await?; + verify_groth16_snark(&cli, image_id, snark_receipt).await?; + } } Ok(()) diff --git a/host/src/operations/build.rs b/host/src/operations/build.rs index 4ce611b1..6d470a56 100644 --- a/host/src/operations/build.rs +++ b/host/src/operations/build.rs @@ -19,6 +19,8 @@ use ethers_core::types::Transaction as EthersTransaction; use log::{info, warn}; use risc0_zkvm::{compute_image_id, Receipt}; use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::Semaphore; use zeth_lib::{ builder::BlockBuilderStrategy, consts::ChainSpec, @@ -27,27 +29,24 @@ use zeth_lib::{ output::BlockBuildOutput, }; +const MAX_CONCURRENT_REQUESTS: usize = 5; + use crate::{ - cli::Cli, + cli::{BuildArgs, Cli}, operations::{execute, maybe_prove, verify_bonsai_receipt}, }; /// Build a single block using the specified strategy. -pub async fn build_block( - cli: &Cli, +async fn preflight_block( + build_args: BuildArgs, + current_block: u64, rpc_url: Option, - chain_spec: &ChainSpec, - guest_elf: &[u8], -) -> anyhow::Result> + chain_spec: Arc, +) -> anyhow::Result<(BlockBuildInput, BlockBuildOutput)> where N::TxEssence: 'static + Send + TryFrom + Serialize + Deserialize<'static>, >::Error: Debug, { - let build_args = cli.build_args().clone(); - if build_args.block_count > 1 { - warn!("Building multiple blocks is not supported. Only the first block will be built."); - } - // Fetch all of the initial data let rpc_cache = build_args.cache.as_ref().map(|dir| { cache_file_path( @@ -60,7 +59,7 @@ where let init_spec = chain_spec.clone(); let preflight_result = tokio::task::spawn_blocking(move || { - N::preflight_with_external_data(&init_spec, rpc_cache, rpc_url, build_args.block_number) + N::preflight_with_external_data(&init_spec, rpc_cache, rpc_url, current_block) }) .await?; let preflight_data = preflight_result.context("preflight failed")?; @@ -73,7 +72,7 @@ where // Verify that the transactions run correctly info!("Running from memory ..."); - let output = N::build_from(chain_spec, input.clone()).context("Error while building block")?; + let output = N::build_from(&chain_spec, input.clone()).context("Error while building block")?; match &output { BlockBuildOutput::SUCCESS { @@ -89,15 +88,29 @@ where } } + Ok((input, output)) +} + +/// Build a single block using the specified strategy. +async fn execute_block( + input: BlockBuildInput, + output: BlockBuildOutput, + cli: Arc, + guest_elf: &'static [u8], +) -> anyhow::Result> +where + N::TxEssence: 'static + Send + TryFrom + Serialize + Deserialize<'static>, + >::Error: Debug, +{ let compressed_output = output.with_state_hashed(); - let result = match cli { + let result = match &*cli { Cli::Build(..) => None, Cli::Run(run_args) => { execute( &input, run_args.execution_po2, run_args.profile, - guest_elf, + &guest_elf, &compressed_output, &cli.execution_tag(), ); @@ -105,9 +118,9 @@ where } Cli::Prove(..) => { maybe_prove( - cli, + &cli, &input, - guest_elf, + &guest_elf, &compressed_output, Default::default(), ) @@ -115,7 +128,7 @@ where } Cli::Verify(verify_args) => Some( verify_bonsai_receipt( - compute_image_id(guest_elf)?, + compute_image_id(&guest_elf)?, &compressed_output, verify_args.bonsai_receipt_uuid.clone(), 4, @@ -123,6 +136,62 @@ where .await?, ), }; - Ok(result) } + +/// Build a single block using the specified strategy. +pub async fn build_block( + cli: Arc, + rpc_url: Option, + chain_spec: Arc, + guest_elf: &'static [u8], +) -> anyhow::Result>> +where + N::TxEssence: + 'static + Send + Sync + TryFrom + Serialize + Deserialize<'static>, + >::Error: Debug, +{ + let build_args = cli.build_args().clone(); + + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); + let mut join_handles = Vec::new(); + + let block_num = build_args.block_number; + // TODO semantics are a bit mixed with block count (was OP specific) + for num in block_num..block_num + build_args.block_count as u64 { + // Acquire permit before queueing job. + let semaphore = semaphore.clone(); + + // Clone variables needed. + let rpc_url = rpc_url.clone(); + let cli = cli.clone(); + let chain_spec = chain_spec.clone(); + + // Spawn blocking for + join_handles.push(tokio::spawn(async move { + // Acquire permit before sending request. + let _permit = semaphore.acquire().await.unwrap(); + + let (input, output) = + preflight_block::(cli.build_args().clone(), num, rpc_url, chain_spec).await?; + + drop(_permit); + + // TODO this could be separated into a separate task, to make sure Bonsai also + // doesn't get throttled, for now just going quick path of dropping permit after + // preflight. + let result = execute_block::(input, output, cli, guest_elf).await; + + result + })); + } + + // Collect responses from tasks. + let mut responses = Vec::new(); + for jh in join_handles { + let response = jh.await?; + responses.push(response?); + } + + Ok(responses) +} diff --git a/primitives/src/trie/mpt.rs b/primitives/src/trie/mpt.rs index b7f04554..0ed92883 100644 --- a/primitives/src/trie/mpt.rs +++ b/primitives/src/trie/mpt.rs @@ -16,11 +16,11 @@ extern crate alloc; use alloc::boxed::Box; use core::{ - cell::RefCell, cmp, fmt::{Debug, Write}, iter, mem, }; +use std::sync::OnceLock; use alloy_primitives::B256; use alloy_rlp::Encodable; @@ -37,14 +37,32 @@ use crate::{keccak::keccak, trie::EMPTY_ROOT}; /// optimizing storage. However, operations targeting a truncated part will fail and /// return an error. Another distinction of this implementation is that branches cannot /// store values, aligning with the construction of MPTs in Ethereum. -#[derive(Clone, Debug, Default, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Eq, Serialize, Deserialize)] pub struct MptNode { /// The type and data of the node. data: MptNodeData, /// Cache for a previously computed reference of this node. This is skipped during /// serialization. #[serde(skip)] - cached_reference: RefCell>, + cached_reference: OnceLock, +} + +impl Ord for MptNode { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.data.cmp(&other.data) + } +} + +impl PartialOrd for MptNode { + fn partial_cmp(&self, other: &Self) -> Option { + self.data.partial_cmp(&other.data) + } +} + +impl PartialEq for MptNode { + fn eq(&self, other: &Self) -> bool { + self.data == other.data + } } /// Represents custom error types for the sparse Merkle Patricia Trie (MPT). @@ -128,7 +146,7 @@ impl From for MptNode { fn from(value: MptNodeData) -> Self { Self { data: value, - cached_reference: RefCell::new(None), + cached_reference: OnceLock::new(), } } } @@ -291,8 +309,7 @@ impl MptNode { #[inline] pub fn reference(&self) -> MptNodeReference { self.cached_reference - .borrow_mut() - .get_or_insert_with(|| self.calc_reference()) + .get_or_init(|| self.calc_reference()) .clone() } @@ -303,11 +320,7 @@ impl MptNode { pub fn hash(&self) -> B256 { match self.data { MptNodeData::Null => EMPTY_ROOT, - _ => match self - .cached_reference - .borrow_mut() - .get_or_insert_with(|| self.calc_reference()) - { + _ => match self.cached_reference.get_or_init(|| self.calc_reference()) { MptNodeReference::Digest(digest) => *digest, MptNodeReference::Bytes(bytes) => keccak(bytes).into(), }, @@ -316,11 +329,7 @@ impl MptNode { /// Encodes the [MptNodeReference] of this node into the `out` buffer. fn reference_encode(&self, out: &mut dyn alloy_rlp::BufMut) { - match self - .cached_reference - .borrow_mut() - .get_or_insert_with(|| self.calc_reference()) - { + match self.cached_reference.get_or_init(|| self.calc_reference()) { // if the reference is an RLP-encoded byte slice, copy it directly MptNodeReference::Bytes(bytes) => out.put_slice(bytes), // if the reference is a digest, RLP-encode it with its fixed known length @@ -333,11 +342,7 @@ impl MptNode { /// Returns the length of the encoded [MptNodeReference] of this node. fn reference_length(&self) -> usize { - match self - .cached_reference - .borrow_mut() - .get_or_insert_with(|| self.calc_reference()) - { + match self.cached_reference.get_or_init(|| self.calc_reference()) { MptNodeReference::Bytes(bytes) => bytes.len(), MptNodeReference::Digest(_) => 1 + 32, } @@ -692,7 +697,7 @@ impl MptNode { } fn invalidate_ref_cache(&mut self) { - self.cached_reference.borrow_mut().take(); + self.cached_reference.take(); } /// Returns the number of traversable nodes in the trie.