diff --git a/Cargo.lock b/Cargo.lock index 413f76e68e3a..fc59735f0910 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9175,6 +9175,7 @@ dependencies = [ "zksync_types", "zksync_utils", "zksync_vlog", + "zksync_vm_executor", "zksync_vm_runner", "zksync_web3_decl", ] @@ -9261,7 +9262,7 @@ dependencies = [ "zksync_system_constants", "zksync_types", "zksync_utils", - "zksync_vm_utils", + "zksync_vm_executor", "zksync_web3_decl", ] @@ -9588,7 +9589,7 @@ dependencies = [ "zksync_test_account", "zksync_types", "zksync_utils", - "zksync_vm_utils", + "zksync_vm_executor", ] [[package]] @@ -9674,7 +9675,7 @@ dependencies = [ "zksync_tee_verifier", "zksync_types", "zksync_utils", - "zksync_vm_utils", + "zksync_vm_executor", ] [[package]] @@ -9776,11 +9777,29 @@ dependencies = [ "vise-exporter", ] +[[package]] +name = "zksync_vm_executor" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "once_cell", + "tokio", + "tracing", + "vise", + "zksync_contracts", + "zksync_dal", + "zksync_multivm", + "zksync_types", +] + [[package]] name = "zksync_vm_interface" version = "0.1.0" dependencies = [ + "anyhow", "assert_matches", + "async-trait", "hex", "serde", "serde_json", @@ -9812,30 +9831,16 @@ dependencies = [ "zksync_contracts", "zksync_dal", "zksync_health_check", - "zksync_multivm", "zksync_node_genesis", "zksync_node_test_utils", "zksync_object_store", "zksync_prover_interface", "zksync_state", - "zksync_state_keeper", "zksync_storage", "zksync_test_account", "zksync_types", "zksync_utils", - "zksync_vm_utils", -] - -[[package]] -name = "zksync_vm_utils" -version = "0.1.0" -dependencies = [ - "anyhow", - "tokio", - "tracing", - "zksync_contracts", - "zksync_dal", - "zksync_types", + "zksync_vm_executor", "zksync_vm_interface", ] diff --git a/Cargo.toml b/Cargo.toml index 6faea57fa1a0..334c85870f27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ members = [ "core/lib/vlog", "core/lib/multivm", "core/lib/vm_interface", - "core/lib/vm_utils", + "core/lib/vm_executor", "core/lib/web3_decl", "core/lib/snapshots_applier", "core/lib/crypto_primitives", @@ -236,7 +236,7 @@ zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" } zksync_prover_dal = { version = "0.1.0", path = "prover/crates/lib/prover_dal" } zksync_vlog = { version = "0.1.0", path = "core/lib/vlog" } zksync_vm_interface = { version = "0.1.0", path = "core/lib/vm_interface" } -zksync_vm_utils = { version = "0.1.0", path = "core/lib/vm_utils" } +zksync_vm_executor = { version = "0.1.0", path = "core/lib/vm_executor" } zksync_basic_types = { version = "0.1.0", path = "core/lib/basic_types" } zksync_circuit_breaker = { version = "0.1.0", path = "core/lib/circuit_breaker" } zksync_config = { version = "0.1.0", path = "core/lib/config" } diff --git a/core/lib/vm_utils/Cargo.toml b/core/lib/vm_executor/Cargo.toml similarity index 67% rename from core/lib/vm_utils/Cargo.toml rename to core/lib/vm_executor/Cargo.toml index cb12e7c8f673..9471e263bf43 100644 --- a/core/lib/vm_utils/Cargo.toml +++ b/core/lib/vm_executor/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "zksync_vm_utils" -description = "ZKsync VM utilities" +name = "zksync_vm_executor" +description = "Implementations of ZKsync VM executors" version.workspace = true edition.workspace = true authors.workspace = true @@ -14,8 +14,11 @@ categories.workspace = true zksync_contracts.workspace = true zksync_dal.workspace = true zksync_types.workspace = true -zksync_vm_interface.workspace = true +zksync_multivm.workspace = true +async-trait.workspace = true +once_cell.workspace = true tokio.workspace = true anyhow.workspace = true tracing.workspace = true +vise.workspace = true diff --git a/core/node/state_keeper/src/batch_executor/mod.rs b/core/lib/vm_executor/src/batch/executor.rs similarity index 53% rename from core/node/state_keeper/src/batch_executor/mod.rs rename to core/lib/vm_executor/src/batch/executor.rs index 235a8f581c82..6dc9354fd7db 100644 --- a/core/node/state_keeper/src/batch_executor/mod.rs +++ b/core/lib/vm_executor/src/batch/executor.rs @@ -1,82 +1,32 @@ -use std::{error::Error as StdError, fmt, sync::Arc}; +use std::{error::Error as StdError, sync::Arc}; use anyhow::Context as _; +use async_trait::async_trait; use tokio::{ sync::{mpsc, oneshot}, task::JoinHandle, }; use zksync_multivm::interface::{ - storage::StorageViewCache, Call, CompressedBytecodeInfo, FinishedL1Batch, Halt, L1BatchEnv, - L2BlockEnv, SystemEnv, VmExecutionResultAndLogs, + executor::BatchExecutor, + storage::{ReadStorage, StorageView}, + BatchTransactionExecutionResult, FinishedL1Batch, L2BlockEnv, }; -use zksync_state::OwnedStorage; use zksync_types::Transaction; -use crate::{ - metrics::{ExecutorCommand, EXECUTOR_METRICS}, - types::ExecutionMetricsForCriteria, -}; - -pub mod main_executor; -#[cfg(test)] -mod tests; - -/// Representation of a transaction executed in the virtual machine. -#[derive(Debug, Clone)] -pub enum TxExecutionResult { - /// Successful execution of the tx and the block tip dry run. - Success { - tx_result: Box, - tx_metrics: Box, - compressed_bytecodes: Vec, - call_tracer_result: Vec, - gas_remaining: u32, - }, - /// The VM rejected the tx for some reason. - RejectedByVm { reason: Halt }, - /// Bootloader gas limit is not enough to execute the tx. - BootloaderOutOfGasForTx, -} - -impl TxExecutionResult { - /// Returns a revert reason if either transaction was rejected or bootloader ran out of gas. - pub(super) fn err(&self) -> Option<&Halt> { - match self { - Self::Success { .. } => None, - Self::RejectedByVm { - reason: rejection_reason, - } => Some(rejection_reason), - Self::BootloaderOutOfGasForTx => Some(&Halt::BootloaderOutOfGas), - } - } -} - -/// An abstraction that allows us to create different kinds of batch executors. -/// The only requirement is to return a [`BatchExecutorHandle`], which does its work -/// by communicating with the externally initialized thread. -/// -/// This type is generic over the storage type accepted to create the VM instance, mostly for testing purposes. -pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug { - fn init_batch( - &mut self, - storage: S, - l1_batch_params: L1BatchEnv, - system_env: SystemEnv, - ) -> BatchExecutorHandle; -} +use super::metrics::{ExecutorCommand, EXECUTOR_METRICS}; #[derive(Debug)] -enum HandleOrError { - Handle(JoinHandle>), +enum HandleOrError { + Handle(JoinHandle>>), Err(Arc), } -impl HandleOrError { +impl HandleOrError { async fn wait_for_error(&mut self) -> anyhow::Error { let err_arc = match self { Self::Handle(handle) => { let err = match handle.await { - Ok(Ok(())) => anyhow::anyhow!("batch executor unexpectedly stopped"), + Ok(Ok(_)) => anyhow::anyhow!("batch executor unexpectedly stopped"), Ok(Err(err)) => err, Err(err) => anyhow::Error::new(err).context("batch executor panicked"), }; @@ -90,7 +40,7 @@ impl HandleOrError { anyhow::Error::new(err_arc) } - async fn wait(self) -> anyhow::Result<()> { + async fn wait(self) -> anyhow::Result> { match self { Self::Handle(handle) => handle.await.context("batch executor panicked")?, Self::Err(err_arc) => Err(anyhow::Error::new(err_arc)), @@ -98,21 +48,16 @@ impl HandleOrError { } } -/// A public interface for interaction with the `BatchExecutor`. -/// `BatchExecutorHandle` is stored in the state keeper and is used to invoke or rollback transactions, and also seal -/// the batches. +/// "Main" [`BatchExecutor`] implementation instantiating a VM in a blocking Tokio thread. #[derive(Debug)] -pub struct BatchExecutorHandle { - handle: HandleOrError, +pub struct MainBatchExecutor { + handle: HandleOrError, commands: mpsc::Sender, } -impl BatchExecutorHandle { - /// Creates a batch executor handle from the provided sender and thread join handle. - /// Can be used to inject an alternative batch executor implementation. - #[doc(hidden)] - pub(super) fn from_raw( - handle: JoinHandle>, +impl MainBatchExecutor { + pub(super) fn new( + handle: JoinHandle>>, commands: mpsc::Sender, ) -> Self { Self { @@ -120,9 +65,18 @@ impl BatchExecutorHandle { commands, } } +} +#[async_trait] +impl BatchExecutor for MainBatchExecutor +where + S: ReadStorage + Send + 'static, +{ #[tracing::instrument(skip_all)] - pub async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result { + async fn execute_tx( + &mut self, + tx: Transaction, + ) -> anyhow::Result { let tx_gas_limit = tx.gas_limit().as_u64(); let (response_sender, response_receiver) = oneshot::channel(); @@ -144,9 +98,9 @@ impl BatchExecutorHandle { }; let elapsed = latency.observe(); - if let TxExecutionResult::Success { tx_metrics, .. } = &res { - let gas_per_nanosecond = tx_metrics.execution_metrics.computational_gas_used as f64 - / elapsed.as_nanos() as f64; + if !res.tx_result.result.is_failed() { + let gas_per_nanosecond = + res.tx_result.statistics.computational_gas_used as f64 / elapsed.as_nanos() as f64; EXECUTOR_METRICS .computational_gas_per_nanosecond .observe(gas_per_nanosecond); @@ -162,13 +116,13 @@ impl BatchExecutorHandle { } #[tracing::instrument(skip_all)] - pub async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> { + async fn rollback_last_tx(&mut self) -> anyhow::Result<()> { // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation // indeed has been processed. let (response_sender, response_receiver) = oneshot::channel(); let send_failed = self .commands - .send(Command::StartNextL2Block(env, response_sender)) + .send(Command::RollbackLastTx(response_sender)) .await .is_err(); if send_failed { @@ -176,7 +130,7 @@ impl BatchExecutorHandle { } let latency = EXECUTOR_METRICS.batch_executor_command_response_time - [&ExecutorCommand::StartNextL2Block] + [&ExecutorCommand::RollbackLastTx] .start(); if response_receiver.await.is_err() { return Err(self.handle.wait_for_error().await); @@ -186,13 +140,13 @@ impl BatchExecutorHandle { } #[tracing::instrument(skip_all)] - pub async fn rollback_last_tx(&mut self) -> anyhow::Result<()> { + async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()> { // While we don't get anything from the channel, it's useful to have it as a confirmation that the operation // indeed has been processed. let (response_sender, response_receiver) = oneshot::channel(); let send_failed = self .commands - .send(Command::RollbackLastTx(response_sender)) + .send(Command::StartNextL2Block(env, response_sender)) .await .is_err(); if send_failed { @@ -200,7 +154,7 @@ impl BatchExecutorHandle { } let latency = EXECUTOR_METRICS.batch_executor_command_response_time - [&ExecutorCommand::RollbackLastTx] + [&ExecutorCommand::StartNextL2Block] .start(); if response_receiver.await.is_err() { return Err(self.handle.wait_for_error().await); @@ -210,7 +164,9 @@ impl BatchExecutorHandle { } #[tracing::instrument(skip_all)] - pub async fn finish_batch(mut self) -> anyhow::Result { + async fn finish_batch( + mut self: Box, + ) -> anyhow::Result<(FinishedL1Batch, StorageView)> { let (response_sender, response_receiver) = oneshot::channel(); let send_failed = self .commands @@ -228,44 +184,19 @@ impl BatchExecutorHandle { Ok(batch) => batch, Err(_) => return Err(self.handle.wait_for_error().await), }; - self.handle.wait().await?; - latency.observe(); - Ok(finished_batch) - } - - pub async fn finish_batch_with_cache( - mut self, - ) -> anyhow::Result<(FinishedL1Batch, StorageViewCache)> { - let (response_sender, response_receiver) = oneshot::channel(); - let send_failed = self - .commands - .send(Command::FinishBatchWithCache(response_sender)) - .await - .is_err(); - if send_failed { - return Err(self.handle.wait_for_error().await); - } - - let latency = EXECUTOR_METRICS.batch_executor_command_response_time - [&ExecutorCommand::FinishBatchWithCache] - .start(); - let batch_with_cache = match response_receiver.await { - Ok(batch_with_cache) => batch_with_cache, - Err(_) => return Err(self.handle.wait_for_error().await), - }; - - self.handle.wait().await?; - latency.observe(); - Ok(batch_with_cache) + let storage_view = self.handle.wait().await?; + Ok((finished_batch, storage_view)) } } #[derive(Debug)] pub(super) enum Command { - ExecuteTx(Box, oneshot::Sender), + ExecuteTx( + Box, + oneshot::Sender, + ), StartNextL2Block(L2BlockEnv, oneshot::Sender<()>), RollbackLastTx(oneshot::Sender<()>), FinishBatch(oneshot::Sender), - FinishBatchWithCache(oneshot::Sender<(FinishedL1Batch, StorageViewCache)>), } diff --git a/core/node/state_keeper/src/batch_executor/main_executor.rs b/core/lib/vm_executor/src/batch/factory.rs similarity index 68% rename from core/node/state_keeper/src/batch_executor/main_executor.rs rename to core/lib/vm_executor/src/batch/factory.rs index 7d1bf5f47b17..17b125b0c41a 100644 --- a/core/node/state_keeper/src/batch_executor/main_executor.rs +++ b/core/lib/vm_executor/src/batch/factory.rs @@ -1,31 +1,31 @@ -use std::sync::Arc; +use std::{marker::PhantomData, rc::Rc, sync::Arc}; use anyhow::Context as _; use once_cell::sync::OnceCell; use tokio::sync::mpsc; use zksync_multivm::{ interface::{ + executor::{BatchExecutor, BatchExecutorFactory}, storage::{ReadStorage, StorageView}, - Call, CompressedBytecodeInfo, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, - L2BlockEnv, SystemEnv, VmExecutionResultAndLogs, VmInterface, VmInterfaceHistoryEnabled, + BatchTransactionExecutionResult, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, + L2BlockEnv, SystemEnv, VmInterface, VmInterfaceHistoryEnabled, }, tracers::CallTracer, vm_latest::HistoryEnabled, MultiVMTracer, VmInstance, }; -use zksync_shared_metrics::{InteractionType, TxStage, APP_METRICS}; use zksync_types::{vm::FastVmMode, Transaction}; -use super::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}; -use crate::{ - metrics::{TxExecutionStage, BATCH_TIP_METRICS, EXECUTOR_METRICS, KEEPER_METRICS}, - types::ExecutionMetricsForCriteria, +use super::{ + executor::{Command, MainBatchExecutor}, + metrics::{TxExecutionStage, BATCH_TIP_METRICS, KEEPER_METRICS}, }; +use crate::batch::metrics::{InteractionType, EXECUTOR_METRICS}; -/// The default implementation of [`BatchExecutor`]. -/// Creates a "real" batch executor which maintains the VM (as opposed to the test builder which doesn't use the VM). +/// The default implementation of [`BatchExecutorFactory`]. +/// Creates real batch executors which maintain the VM (as opposed to the test factories which don't use the VM). #[derive(Debug, Clone)] -pub struct MainBatchExecutor { +pub struct MainBatchExecutorFactory { save_call_traces: bool, /// Whether batch executor would allow transactions with bytecode that cannot be compressed. /// For new blocks, bytecode compression is mandatory -- if bytecode compression is not supported, @@ -37,7 +37,7 @@ pub struct MainBatchExecutor { fast_vm_mode: FastVmMode, } -impl MainBatchExecutor { +impl MainBatchExecutorFactory { pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self { Self { save_call_traces, @@ -56,13 +56,13 @@ impl MainBatchExecutor { } } -impl BatchExecutor for MainBatchExecutor { +impl BatchExecutorFactory for MainBatchExecutorFactory { fn init_batch( &mut self, storage: S, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - ) -> BatchExecutorHandle { + ) -> Box> { // Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued // until a previous command is processed), capacity 1 is enough for the commands channel. let (commands_sender, commands_receiver) = mpsc::channel(1); @@ -71,21 +71,15 @@ impl BatchExecutor for MainBatchExecutor { optional_bytecode_compression: self.optional_bytecode_compression, fast_vm_mode: self.fast_vm_mode, commands: commands_receiver, + _storage: PhantomData, }; let handle = tokio::task::spawn_blocking(move || executor.run(storage, l1_batch_params, system_env)); - BatchExecutorHandle::from_raw(handle, commands_sender) + Box::new(MainBatchExecutor::new(handle, commands_sender)) } } -#[derive(Debug)] -struct TransactionOutput { - tx_result: VmExecutionResultAndLogs, - compressed_bytecodes: Vec, - calls: Vec, -} - /// Implementation of the "primary" (non-test) batch executor. /// Upon launch, it initializes the VM object with provided block context and properties, and keeps invoking the commands /// sent to it one by one until the batch is finished. @@ -93,20 +87,21 @@ struct TransactionOutput { /// One `CommandReceiver` can execute exactly one batch, so once the batch is sealed, a new `CommandReceiver` object must /// be constructed. #[derive(Debug)] -struct CommandReceiver { +struct CommandReceiver { save_call_traces: bool, optional_bytecode_compression: bool, fast_vm_mode: FastVmMode, commands: mpsc::Receiver, + _storage: PhantomData, } -impl CommandReceiver { - pub(super) fn run( +impl CommandReceiver { + pub(super) fn run( mut self, storage: S, l1_batch_params: L1BatchEnv, system_env: SystemEnv, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { tracing::info!("Starting executing L1 batch #{}", &l1_batch_params.number); let storage_view = StorageView::new(storage).to_rc_ptr(); @@ -116,13 +111,15 @@ impl CommandReceiver { storage_view.clone(), self.fast_vm_mode, ); + let mut batch_finished = false; while let Some(cmd) = self.commands.blocking_recv() { match cmd { Command::ExecuteTx(tx, resp) => { - let result = self - .execute_tx(&tx, &mut vm) - .with_context(|| format!("fatal error executing transaction {tx:?}"))?; + let tx_hash = tx.hash(); + let result = self.execute_tx(*tx, &mut vm).with_context(|| { + format!("fatal error executing transaction {tx_hash:?}") + })?; if resp.send(result).is_err() { break; } @@ -144,36 +141,34 @@ impl CommandReceiver { if resp.send(vm_block_result).is_err() { break; } - - // `storage_view` cannot be accessed while borrowed by the VM, - // so this is the only point at which storage metrics can be obtained - let metrics = storage_view.as_ref().borrow_mut().metrics(); - EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::GetValue] - .observe(metrics.time_spent_on_get_value); - EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::SetValue] - .observe(metrics.time_spent_on_set_value); - return Ok(()); - } - Command::FinishBatchWithCache(resp) => { - let vm_block_result = self.finish_batch(&mut vm)?; - let cache = (*storage_view).borrow().cache(); - if resp.send((vm_block_result, cache)).is_err() { - break; - } - return Ok(()); + batch_finished = true; + break; } } } - // State keeper can exit because of stop signal, so it's OK to exit mid-batch. - tracing::info!("State keeper exited with an unfinished L1 batch"); - Ok(()) + + drop(vm); + let storage_view = Rc::into_inner(storage_view) + .context("storage view leaked")? + .into_inner(); + if batch_finished { + let metrics = storage_view.metrics(); + EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::GetValue] + .observe(metrics.time_spent_on_get_value); + EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::SetValue] + .observe(metrics.time_spent_on_set_value); + } else { + // State keeper can exit because of stop signal, so it's OK to exit mid-batch. + tracing::info!("State keeper exited with an unfinished L1 batch"); + } + Ok(storage_view) } - fn execute_tx( + fn execute_tx( &self, - tx: &Transaction, + transaction: Transaction, vm: &mut VmInstance, - ) -> anyhow::Result { + ) -> anyhow::Result { // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot // was already removed), or that we build on top of it (in which case, it can be removed now). vm.pop_snapshot_no_rollback(); @@ -182,47 +177,23 @@ impl CommandReceiver { // Execute the transaction. let latency = KEEPER_METRICS.tx_execution_time[&TxExecutionStage::Execution].start(); - let output = if self.optional_bytecode_compression { - self.execute_tx_in_vm_with_optional_compression(tx, vm)? + let result = if self.optional_bytecode_compression { + self.execute_tx_in_vm_with_optional_compression(&transaction, vm)? } else { - self.execute_tx_in_vm(tx, vm)? + self.execute_tx_in_vm(&transaction, vm)? }; latency.observe(); - APP_METRICS.processed_txs[&TxStage::StateKeeper].inc(); - APP_METRICS.processed_l1_txs[&TxStage::StateKeeper].inc_by(tx.is_l1().into()); - let TransactionOutput { - tx_result, - compressed_bytecodes, - calls, - } = output; - - if let ExecutionResult::Halt { reason } = tx_result.result { - return Ok(match reason { - Halt::BootloaderOutOfGas => TxExecutionResult::BootloaderOutOfGasForTx, - _ => TxExecutionResult::RejectedByVm { reason }, - }); - } - - let tx_metrics = ExecutionMetricsForCriteria::new(Some(tx), &tx_result); - let gas_remaining = tx_result.statistics.gas_remaining; - - Ok(TxExecutionResult::Success { - tx_result: Box::new(tx_result), - tx_metrics: Box::new(tx_metrics), - compressed_bytecodes, - call_tracer_result: calls, - gas_remaining, - }) + Ok(result) } - fn rollback_last_tx(&self, vm: &mut VmInstance) { + fn rollback_last_tx(&self, vm: &mut VmInstance) { let latency = KEEPER_METRICS.tx_execution_time[&TxExecutionStage::TxRollback].start(); vm.rollback_to_the_latest_snapshot(); latency.observe(); } - fn start_next_l2_block( + fn start_next_l2_block( &self, l2_block_env: L2BlockEnv, vm: &mut VmInstance, @@ -230,7 +201,7 @@ impl CommandReceiver { vm.start_new_l2_block(l2_block_env); } - fn finish_batch( + fn finish_batch( &self, vm: &mut VmInstance, ) -> anyhow::Result { @@ -249,11 +220,11 @@ impl CommandReceiver { /// Attempts to execute transaction with or without bytecode compression. /// If compression fails, the transaction will be re-executed without compression. - fn execute_tx_in_vm_with_optional_compression( + fn execute_tx_in_vm_with_optional_compression( &self, tx: &Transaction, vm: &mut VmInstance, - ) -> anyhow::Result { + ) -> anyhow::Result { // Note, that the space where we can put the calldata for compressing transactions // is limited and the transactions do not pay for taking it. // In order to not let the accounts spam the space of compressed bytecodes with bytecodes @@ -273,14 +244,14 @@ impl CommandReceiver { if let (Ok(compressed_bytecodes), tx_result) = vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), true) { - let calls = Arc::try_unwrap(call_tracer_result) + let call_traces = Arc::try_unwrap(call_tracer_result) .map_err(|_| anyhow::anyhow!("failed extracting call traces"))? .take() .unwrap_or_default(); - return Ok(TransactionOutput { - tx_result, + return Ok(BatchTransactionExecutionResult { + tx_result: Box::new(tx_result), compressed_bytecodes, - calls, + call_traces, }); } @@ -303,24 +274,24 @@ impl CommandReceiver { // TODO implement tracer manager which will be responsible // for collecting result from all tracers and save it to the database - let calls = Arc::try_unwrap(call_tracer_result) + let call_traces = Arc::try_unwrap(call_tracer_result) .map_err(|_| anyhow::anyhow!("failed extracting call traces"))? .take() .unwrap_or_default(); - Ok(TransactionOutput { - tx_result, + Ok(BatchTransactionExecutionResult { + tx_result: Box::new(tx_result), compressed_bytecodes, - calls, + call_traces, }) } /// Attempts to execute transaction with mandatory bytecode compression. /// If bytecode compression fails, the transaction will be rejected. - fn execute_tx_in_vm( + fn execute_tx_in_vm( &self, tx: &Transaction, vm: &mut VmInstance, - ) -> anyhow::Result { + ) -> anyhow::Result { let call_tracer_result = Arc::new(OnceCell::default()); let tracer = if self.save_call_traces { vec![CallTracer::new(call_tracer_result.clone()).into_tracer_pointer()] @@ -331,24 +302,24 @@ impl CommandReceiver { let (bytecodes_result, mut tx_result) = vm.inspect_transaction_with_bytecode_compression(tracer.into(), tx.clone(), true); if let Ok(compressed_bytecodes) = bytecodes_result { - let calls = Arc::try_unwrap(call_tracer_result) + let call_traces = Arc::try_unwrap(call_tracer_result) .map_err(|_| anyhow::anyhow!("failed extracting call traces"))? .take() .unwrap_or_default(); - Ok(TransactionOutput { - tx_result, + Ok(BatchTransactionExecutionResult { + tx_result: Box::new(tx_result), compressed_bytecodes, - calls, + call_traces, }) } else { // Transaction failed to publish bytecodes, we reject it so initiator doesn't pay fee. tx_result.result = ExecutionResult::Halt { reason: Halt::FailedToPublishCompressedBytecodes, }; - Ok(TransactionOutput { - tx_result, + Ok(BatchTransactionExecutionResult { + tx_result: Box::new(tx_result), compressed_bytecodes: vec![], - calls: vec![], + call_traces: vec![], }) } } diff --git a/core/lib/vm_executor/src/batch/metrics.rs b/core/lib/vm_executor/src/batch/metrics.rs new file mode 100644 index 000000000000..170ed4717989 --- /dev/null +++ b/core/lib/vm_executor/src/batch/metrics.rs @@ -0,0 +1,95 @@ +//! Main batch executor metrics. + +use std::time::Duration; + +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics}; +use zksync_multivm::interface::VmExecutionResultAndLogs; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "command", rename_all = "snake_case")] +pub(super) enum ExecutorCommand { + ExecuteTx, + #[metrics(name = "start_next_miniblock")] + StartNextL2Block, + RollbackLastTx, + FinishBatch, +} + +const GAS_PER_NANOSECOND_BUCKETS: Buckets = Buckets::values(&[ + 0.01, 0.03, 0.1, 0.3, 0.5, 0.75, 1., 1.5, 3., 5., 10., 20., 50., +]); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(super) enum TxExecutionStage { + Execution, + TxRollback, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "interaction", rename_all = "snake_case")] +pub(super) enum InteractionType { + GetValue, + SetValue, +} + +/// Executor-related metrics. +#[derive(Debug, Metrics)] +#[metrics(prefix = "state_keeper")] +pub(super) struct ExecutorMetrics { + /// Latency to process a single command sent to the batch executor. + #[metrics(buckets = Buckets::LATENCIES)] + pub batch_executor_command_response_time: Family>, + #[metrics(buckets = GAS_PER_NANOSECOND_BUCKETS)] + pub computational_gas_per_nanosecond: Histogram, + #[metrics(buckets = GAS_PER_NANOSECOND_BUCKETS)] + pub failed_tx_gas_limit_per_nanosecond: Histogram, + /// Cumulative latency of interacting with the storage when executing a transaction + /// in the batch executor. + #[metrics(buckets = Buckets::LATENCIES)] + pub batch_storage_interaction_duration: Family>, +} + +#[vise::register] +pub(super) static EXECUTOR_METRICS: vise::Global = vise::Global::new(); + +/// Some more executor-related metrics with differing prefix. +#[derive(Debug, Metrics)] +#[metrics(prefix = "server_state_keeper")] +pub(super) struct StateKeeperMetrics { + /// Time spent by the state keeper on transaction execution. + #[metrics(buckets = Buckets::LATENCIES)] + pub tx_execution_time: Family>, +} + +#[vise::register] +pub(super) static KEEPER_METRICS: vise::Global = vise::Global::new(); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "batch_tip")] +pub(super) struct BatchTipMetrics { + #[metrics(buckets = Buckets::exponential(60000.0..=80000000.0, 2.0))] + gas_used: Histogram, + #[metrics(buckets = Buckets::exponential(1.0..=60000.0, 2.0))] + pubdata_published: Histogram, + #[metrics(buckets = Buckets::exponential(1.0..=4096.0, 2.0))] + circuit_statistic: Histogram, + #[metrics(buckets = Buckets::exponential(1.0..=4096.0, 2.0))] + execution_metrics_size: Histogram, +} + +impl BatchTipMetrics { + pub fn observe(&self, execution_result: &VmExecutionResultAndLogs) { + self.gas_used + .observe(execution_result.statistics.gas_used as usize); + self.pubdata_published + .observe(execution_result.statistics.pubdata_published as usize); + self.circuit_statistic + .observe(execution_result.statistics.circuit_statistic.total()); + self.execution_metrics_size + .observe(execution_result.get_execution_metrics(None).size()); + } +} + +#[vise::register] +pub(super) static BATCH_TIP_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/vm_executor/src/batch/mod.rs b/core/lib/vm_executor/src/batch/mod.rs new file mode 100644 index 000000000000..2407d2daba2c --- /dev/null +++ b/core/lib/vm_executor/src/batch/mod.rs @@ -0,0 +1,9 @@ +//! Main implementation of ZKsync VM [batch executor](crate::interface::BatchExecutor). +//! +//! This implementation is used by various ZKsync components, like the state keeper and components based on the VM runner. + +pub use self::{executor::MainBatchExecutor, factory::MainBatchExecutorFactory}; + +mod executor; +mod factory; +mod metrics; diff --git a/core/lib/vm_executor/src/lib.rs b/core/lib/vm_executor/src/lib.rs new file mode 100644 index 000000000000..24fb3d8f7eee --- /dev/null +++ b/core/lib/vm_executor/src/lib.rs @@ -0,0 +1,9 @@ +//! Implementations of ZKsync VM executors and executor-related utils. +//! +//! The included implementations are separated from the respective interfaces since they depend +//! on [VM implementations](zksync_multivm), are aware of ZKsync node storage etc. + +pub use zksync_multivm::interface::executor as interface; + +pub mod batch; +pub mod storage; diff --git a/core/lib/vm_utils/src/storage.rs b/core/lib/vm_executor/src/storage.rs similarity index 98% rename from core/lib/vm_utils/src/storage.rs rename to core/lib/vm_executor/src/storage.rs index 1e43543bc5aa..e39748786a30 100644 --- a/core/lib/vm_utils/src/storage.rs +++ b/core/lib/vm_executor/src/storage.rs @@ -1,13 +1,15 @@ +//! Utils to get data for L1 batch execution from storage. + use std::time::{Duration, Instant}; use anyhow::Context; use zksync_contracts::BaseSystemContracts; use zksync_dal::{Connection, Core, CoreDal, DalError}; +use zksync_multivm::interface::{L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode}; use zksync_types::{ block::L2BlockHeader, fee_model::BatchFeeInput, snapshots::SnapshotRecoveryStatus, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, H256, ZKPORTER_IS_AVAILABLE, }; -use zksync_vm_interface::{L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode}; const BATCH_COMPUTATIONAL_GAS_LIMIT: u32 = u32::MAX; diff --git a/core/lib/vm_interface/Cargo.toml b/core/lib/vm_interface/Cargo.toml index 8fc7883f1df7..694576dca3b0 100644 --- a/core/lib/vm_interface/Cargo.toml +++ b/core/lib/vm_interface/Cargo.toml @@ -15,6 +15,8 @@ zksync_contracts.workspace = true zksync_system_constants.workspace = true zksync_types.workspace = true +anyhow.workspace = true +async-trait.workspace = true hex.workspace = true serde.workspace = true thiserror.workspace = true diff --git a/core/lib/vm_interface/src/executor.rs b/core/lib/vm_interface/src/executor.rs new file mode 100644 index 000000000000..ee6665abfcb1 --- /dev/null +++ b/core/lib/vm_interface/src/executor.rs @@ -0,0 +1,44 @@ +//! High-level executor traits. + +use std::fmt; + +use async_trait::async_trait; +use zksync_types::Transaction; + +use crate::{ + storage::StorageView, BatchTransactionExecutionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, + SystemEnv, +}; + +/// Factory of [`BatchExecutor`]s. +pub trait BatchExecutorFactory: 'static + Send + fmt::Debug { + /// Initializes an executor for a batch with the specified params and using the provided storage. + fn init_batch( + &mut self, + storage: S, + l1_batch_params: L1BatchEnv, + system_env: SystemEnv, + ) -> Box>; +} + +/// Handle for executing a single L1 batch. +/// +/// The handle is parametric by the transaction execution output in order to be able to represent different +/// levels of abstraction. +#[async_trait] +pub trait BatchExecutor: 'static + Send + fmt::Debug { + /// Executes a transaction. + async fn execute_tx( + &mut self, + tx: Transaction, + ) -> anyhow::Result; + + /// Rolls back the last executed transaction. + async fn rollback_last_tx(&mut self) -> anyhow::Result<()>; + + /// Starts a next L2 block with the specified params. + async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()>; + + /// Finished the current L1 batch. + async fn finish_batch(self: Box) -> anyhow::Result<(FinishedL1Batch, StorageView)>; +} diff --git a/core/lib/vm_interface/src/lib.rs b/core/lib/vm_interface/src/lib.rs index dba93a49ec86..315eb2bb36a7 100644 --- a/core/lib/vm_interface/src/lib.rs +++ b/core/lib/vm_interface/src/lib.rs @@ -28,17 +28,18 @@ pub use crate::{ VmExecutionMode, }, outputs::{ - BootloaderMemory, Call, CallType, CircuitStatistic, CompressedBytecodeInfo, - CurrentExecutionState, DeduplicatedWritesMetrics, ExecutionResult, FinishedL1Batch, - L2Block, Refunds, TransactionExecutionMetrics, TransactionExecutionResult, - TxExecutionStatus, VmEvent, VmExecutionLogs, VmExecutionMetrics, - VmExecutionResultAndLogs, VmExecutionStatistics, VmMemoryMetrics, + BatchTransactionExecutionResult, BootloaderMemory, Call, CallType, CircuitStatistic, + CompressedBytecodeInfo, CurrentExecutionState, DeduplicatedWritesMetrics, + ExecutionResult, FinishedL1Batch, L2Block, Refunds, TransactionExecutionMetrics, + TransactionExecutionResult, TxExecutionStatus, VmEvent, VmExecutionLogs, + VmExecutionMetrics, VmExecutionResultAndLogs, VmExecutionStatistics, VmMemoryMetrics, }, tracer, }, vm::{VmFactory, VmInterface, VmInterfaceExt, VmInterfaceHistoryEnabled}, }; +pub mod executor; pub mod storage; mod types; mod vm; diff --git a/core/lib/vm_interface/src/storage/view.rs b/core/lib/vm_interface/src/storage/view.rs index 691a9d442ca8..101f5c82f497 100644 --- a/core/lib/vm_interface/src/storage/view.rs +++ b/core/lib/vm_interface/src/storage/view.rs @@ -102,7 +102,7 @@ where } } -impl StorageView { +impl StorageView { /// Creates a new storage view based on the underlying storage. pub fn new(storage_handle: S) -> Self { Self { diff --git a/core/lib/vm_interface/src/types/outputs/execution_result.rs b/core/lib/vm_interface/src/types/outputs/execution_result.rs index 37e122c6d9d9..d74d74652e28 100644 --- a/core/lib/vm_interface/src/types/outputs/execution_result.rs +++ b/core/lib/vm_interface/src/types/outputs/execution_result.rs @@ -297,6 +297,21 @@ impl Call { } } +/// Mid-level transaction execution output returned by a batch executor. +#[derive(Debug, Clone)] +pub struct BatchTransactionExecutionResult { + pub tx_result: Box, + pub compressed_bytecodes: Vec, + pub call_traces: Vec, +} + +impl BatchTransactionExecutionResult { + pub fn was_halted(&self) -> bool { + matches!(self.tx_result.result, ExecutionResult::Halt { .. }) + } +} + +/// High-level transaction execution result used by the API server sandbox etc. #[derive(Debug, Clone, PartialEq)] pub struct TransactionExecutionResult { pub transaction: Transaction, diff --git a/core/lib/vm_interface/src/types/outputs/finished_l1batch.rs b/core/lib/vm_interface/src/types/outputs/finished_l1batch.rs index 9c0afc6659f0..27241c2c0fae 100644 --- a/core/lib/vm_interface/src/types/outputs/finished_l1batch.rs +++ b/core/lib/vm_interface/src/types/outputs/finished_l1batch.rs @@ -1,6 +1,7 @@ use zksync_types::writes::StateDiffRecord; use super::{BootloaderMemory, CurrentExecutionState, VmExecutionResultAndLogs}; +use crate::{ExecutionResult, Refunds, VmExecutionLogs, VmExecutionStatistics}; /// State of the VM after the batch execution. #[derive(Debug, Clone)] @@ -16,3 +17,28 @@ pub struct FinishedL1Batch { /// List of state diffs. Could be none for old versions of the VM. pub state_diffs: Option>, } + +impl FinishedL1Batch { + pub fn mock() -> Self { + FinishedL1Batch { + block_tip_execution_result: VmExecutionResultAndLogs { + result: ExecutionResult::Success { output: vec![] }, + logs: VmExecutionLogs::default(), + statistics: VmExecutionStatistics::default(), + refunds: Refunds::default(), + }, + final_execution_state: CurrentExecutionState { + events: vec![], + deduplicated_storage_logs: vec![], + used_contract_hashes: vec![], + user_l2_to_l1_logs: vec![], + system_logs: vec![], + storage_refunds: Vec::new(), + pubdata_costs: Vec::new(), + }, + final_bootloader_memory: Some(vec![]), + pubdata_input: Some(vec![]), + state_diffs: Some(vec![]), + } + } +} diff --git a/core/lib/vm_interface/src/types/outputs/mod.rs b/core/lib/vm_interface/src/types/outputs/mod.rs index d24e1440f836..abefa59bbe7e 100644 --- a/core/lib/vm_interface/src/types/outputs/mod.rs +++ b/core/lib/vm_interface/src/types/outputs/mod.rs @@ -1,8 +1,9 @@ pub use self::{ bytecode::CompressedBytecodeInfo, execution_result::{ - Call, CallType, ExecutionResult, Refunds, TransactionExecutionResult, TxExecutionStatus, - VmEvent, VmExecutionLogs, VmExecutionResultAndLogs, + BatchTransactionExecutionResult, Call, CallType, ExecutionResult, Refunds, + TransactionExecutionResult, TxExecutionStatus, VmEvent, VmExecutionLogs, + VmExecutionResultAndLogs, }, execution_state::{BootloaderMemory, CurrentExecutionState}, finished_l1batch::FinishedL1Batch, diff --git a/core/lib/vm_utils/src/lib.rs b/core/lib/vm_utils/src/lib.rs deleted file mode 100644 index 30f61eb69f21..000000000000 --- a/core/lib/vm_utils/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod storage; diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 0537aaabc563..90063772da92 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -30,14 +30,15 @@ use zksync_node_sync::{ }; use zksync_node_test_utils::{create_l1_batch_metadata, l1_batch_metadata_to_commitment_artifacts}; use zksync_state_keeper::{ + executor::MainBatchExecutorFactory, io::{IoCursor, L1BatchParams, L2BlockParams}, seal_criteria::NoopSealer, testonly::{ fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory, MockBatchExecutor, }, - AsyncRocksdbCache, MainBatchExecutor, OutputHandler, StateKeeperPersistence, - TreeWritesPersistence, ZkSyncStateKeeper, + AsyncRocksdbCache, OutputHandler, StateKeeperPersistence, TreeWritesPersistence, + ZkSyncStateKeeper, }; use zksync_test_account::Account; use zksync_types::{ @@ -592,12 +593,13 @@ impl StateKeeperRunner { }); s.spawn_bg({ + let executor_factory = MainBatchExecutorFactory::new(false, false); let stop_recv = stop_recv.clone(); async { ZkSyncStateKeeper::new( stop_recv, Box::new(io), - Box::new(MainBatchExecutor::new(false, false)), + Box::new(executor_factory), OutputHandler::new(Box::new(persistence.with_tx_insertion())) .with_handler(Box::new(self.sync_state.clone())), Arc::new(NoopSealer), diff --git a/core/node/external_proof_integration_api/src/lib.rs b/core/node/external_proof_integration_api/src/lib.rs index 4ad8e2595a01..4355896e2a2e 100644 --- a/core/node/external_proof_integration_api/src/lib.rs +++ b/core/node/external_proof_integration_api/src/lib.rs @@ -4,8 +4,6 @@ mod middleware; mod processor; mod types; -pub use crate::processor::Processor; - use std::net::SocketAddr; use anyhow::Context; @@ -20,6 +18,7 @@ use tokio::sync::watch; use types::{ExternalProof, ProofGenerationDataResponse}; use zksync_basic_types::L1BatchNumber; +pub use crate::processor::Processor; use crate::{ metrics::{CallOutcome, Method}, middleware::MetricsMiddleware, diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index 3a81a578c033..f9efb22bd610 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -38,6 +38,7 @@ zksync_eth_sender.workspace = true zksync_da_client.workspace = true zksync_da_dispatcher.workspace = true zksync_block_reverter.workspace = true +zksync_vm_executor.workspace = true zksync_state_keeper.workspace = true zksync_consistency_checker.workspace = true zksync_metadata_calculator.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs index 3288b68bdebb..f369db2bbf01 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/main_batch_executor.rs @@ -1,5 +1,5 @@ -use zksync_state_keeper::MainBatchExecutor; use zksync_types::vm::FastVmMode; +use zksync_vm_executor::batch::MainBatchExecutorFactory; use crate::{ implementations::resources::state_keeper::BatchExecutorResource, @@ -39,8 +39,10 @@ impl WiringLayer for MainBatchExecutorLayer { } async fn wire(self, (): Self::Input) -> Result { - let mut executor = - MainBatchExecutor::new(self.save_call_traces, self.optional_bytecode_compression); + let mut executor = MainBatchExecutorFactory::new( + self.save_call_traces, + self.optional_bytecode_compression, + ); executor.set_fast_vm_mode(self.fast_vm_mode); Ok(executor.into()) } diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index a77344f3706e..55defd095be8 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -1,20 +1,14 @@ use std::sync::Arc; use anyhow::Context; -use zksync_state::{AsyncCatchupTask, ReadStorageFactory}; +pub use zksync_state::RocksdbStorageOptions; +use zksync_state::{AsyncCatchupTask, OwnedStorage, ReadStorageFactory}; use zksync_state_keeper::{ - seal_criteria::ConditionalSealer, AsyncRocksdbCache, BatchExecutor, OutputHandler, - StateKeeperIO, ZkSyncStateKeeper, + seal_criteria::ConditionalSealer, AsyncRocksdbCache, OutputHandler, StateKeeperIO, + ZkSyncStateKeeper, }; use zksync_storage::RocksDB; - -pub mod external_io; -pub mod main_batch_executor; -pub mod mempool_io; -pub mod output_handler; - -// Public re-export to not require the user to directly depend on `zksync_state`. -pub use zksync_state::RocksdbStorageOptions; +use zksync_vm_executor::interface::BatchExecutorFactory; use crate::{ implementations::resources::{ @@ -30,6 +24,11 @@ use crate::{ FromContext, IntoContext, }; +pub mod external_io; +pub mod main_batch_executor; +pub mod mempool_io; +pub mod output_handler; + /// Wiring layer for the state keeper. #[derive(Debug)] pub struct StateKeeperLayer { @@ -102,7 +101,7 @@ impl WiringLayer for StateKeeperLayer { let state_keeper = StateKeeperTask { io, - batch_executor: batch_executor_base, + executor_factory: batch_executor_base, output_handler, sealer, storage_factory: Arc::new(storage_factory), @@ -125,7 +124,7 @@ impl WiringLayer for StateKeeperLayer { #[derive(Debug)] pub struct StateKeeperTask { io: Box, - batch_executor: Box, + executor_factory: Box>, output_handler: OutputHandler, sealer: Arc, storage_factory: Arc, @@ -141,7 +140,7 @@ impl Task for StateKeeperTask { let state_keeper = ZkSyncStateKeeper::new( stop_receiver.0, self.io, - self.batch_executor, + self.executor_factory, self.output_handler, self.sealer, self.storage_factory, diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/bwip.rs b/core/node/node_framework/src/implementations/layers/vm_runner/bwip.rs index ee2fb84416e1..858692d3c854 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/bwip.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/bwip.rs @@ -1,6 +1,6 @@ use zksync_config::configs::vm_runner::BasicWitnessInputProducerConfig; -use zksync_state_keeper::MainBatchExecutor; use zksync_types::L2ChainId; +use zksync_vm_executor::batch::MainBatchExecutorFactory; use zksync_vm_runner::{ impls::{BasicWitnessInputProducer, BasicWitnessInputProducerIo}, ConcurrentOutputHandlerFactoryTask, StorageSyncTask, @@ -76,12 +76,12 @@ impl WiringLayer for BasicWitnessInputProducerLayer { let connection_pool = master_pool.get_custom(self.config.window_size + 2).await?; // We don't get the executor from the context because it would contain state keeper-specific settings. - let batch_executor = Box::new(MainBatchExecutor::new(false, false)); + let batch_executor = MainBatchExecutorFactory::new(false, false); let (basic_witness_input_producer, tasks) = BasicWitnessInputProducer::new( connection_pool, object_store.0, - batch_executor, + Box::new(batch_executor), self.config.db_path, self.zksync_network_id, self.config.first_processed_batch, diff --git a/core/node/node_framework/src/implementations/resources/state_keeper.rs b/core/node/node_framework/src/implementations/resources/state_keeper.rs index 5db570d7989b..eed0e022774d 100644 --- a/core/node/node_framework/src/implementations/resources/state_keeper.rs +++ b/core/node/node_framework/src/implementations/resources/state_keeper.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use zksync_state_keeper::{ - seal_criteria::ConditionalSealer, BatchExecutor, OutputHandler, StateKeeperIO, -}; +use zksync_state::OwnedStorage; +use zksync_state_keeper::{seal_criteria::ConditionalSealer, OutputHandler, StateKeeperIO}; +use zksync_vm_executor::interface::BatchExecutorFactory; use crate::resource::{Resource, Unique}; @@ -23,10 +23,10 @@ impl From for StateKeeperIOResource { } } -/// A resource that provides [`BatchExecutor`] implementation to the service. +/// A resource that provides [`BatchExecutorFactory`] implementation to the service. /// This resource is unique, e.g. it's expected to be consumed by a single service. #[derive(Debug, Clone)] -pub struct BatchExecutorResource(pub Unique>); +pub struct BatchExecutorResource(pub Unique>>); impl Resource for BatchExecutorResource { fn name() -> String { @@ -34,7 +34,10 @@ impl Resource for BatchExecutorResource { } } -impl From for BatchExecutorResource { +impl From for BatchExecutorResource +where + T: BatchExecutorFactory, +{ fn from(executor: T) -> Self { Self(Unique::new(Box::new(executor))) } diff --git a/core/node/node_sync/Cargo.toml b/core/node/node_sync/Cargo.toml index 5f1ae04c5f50..ccfc8dd8a4e9 100644 --- a/core/node/node_sync/Cargo.toml +++ b/core/node/node_sync/Cargo.toml @@ -25,7 +25,7 @@ zksync_utils.workspace = true zksync_eth_client.workspace = true zksync_concurrency.workspace = true vise.workspace = true -zksync_vm_utils.workspace = true +zksync_vm_executor.workspace = true anyhow.workspace = true async-trait.workspace = true diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 50734421341e..b7b8930c4957 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -20,7 +20,7 @@ use zksync_types::{ L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, }; use zksync_utils::bytes_to_be_words; -use zksync_vm_utils::storage::L1BatchParamsProvider; +use zksync_vm_executor::storage::L1BatchParamsProvider; use super::{ client::MainNodeClient, diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index edd8306e72e0..d9a98c2bce36 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -121,15 +121,15 @@ impl StateKeeperHandles { .unwrap(); let (stop_sender, stop_receiver) = watch::channel(false); - let mut batch_executor_base = TestBatchExecutorBuilder::default(); + let mut batch_executor = TestBatchExecutorBuilder::default(); for &tx_hashes_in_l1_batch in tx_hashes { - batch_executor_base.push_successful_transactions(tx_hashes_in_l1_batch); + batch_executor.push_successful_transactions(tx_hashes_in_l1_batch); } let state_keeper = ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - Box::new(batch_executor_base), + Box::new(batch_executor), output_handler, Arc::new(NoopSealer), Arc::new(MockReadStorageFactory), diff --git a/core/node/state_keeper/Cargo.toml b/core/node/state_keeper/Cargo.toml index 16eb657bc9b7..1810cc00de51 100644 --- a/core/node/state_keeper/Cargo.toml +++ b/core/node/state_keeper/Cargo.toml @@ -28,7 +28,7 @@ zksync_protobuf.workspace = true zksync_test_account.workspace = true zksync_node_genesis.workspace = true zksync_node_test_utils.workspace = true -zksync_vm_utils.workspace = true +zksync_vm_executor.workspace = true zksync_system_constants.workspace = true zksync_base_token_adjuster.workspace = true diff --git a/core/node/state_keeper/src/executor/mod.rs b/core/node/state_keeper/src/executor/mod.rs new file mode 100644 index 000000000000..2fa5c3b9c128 --- /dev/null +++ b/core/node/state_keeper/src/executor/mod.rs @@ -0,0 +1,60 @@ +use zksync_multivm::interface::{ + BatchTransactionExecutionResult, Call, CompressedBytecodeInfo, ExecutionResult, Halt, + VmExecutionResultAndLogs, +}; +use zksync_types::Transaction; +pub use zksync_vm_executor::batch::MainBatchExecutorFactory; + +use crate::ExecutionMetricsForCriteria; + +#[cfg(test)] +mod tests; + +/// State keeper representation of a transaction executed in the virtual machine. +/// +/// A separate type allows to be more typesafe when dealing with halted transactions. It also simplifies testing seal criteria +/// (i.e., without picking transactions that actually produce appropriate `ExecutionMetricsForCriteria`). +#[derive(Debug, Clone)] +pub enum TxExecutionResult { + /// Successful execution of the tx and the block tip dry run. + Success { + tx_result: Box, + tx_metrics: Box, + compressed_bytecodes: Vec, + call_tracer_result: Vec, + gas_remaining: u32, + }, + /// The VM rejected the tx for some reason. + RejectedByVm { reason: Halt }, + /// Bootloader gas limit is not enough to execute the tx. + BootloaderOutOfGasForTx, +} + +impl TxExecutionResult { + pub(crate) fn new(res: BatchTransactionExecutionResult, tx: &Transaction) -> Self { + match res.tx_result.result { + ExecutionResult::Halt { + reason: Halt::BootloaderOutOfGas, + } => Self::BootloaderOutOfGasForTx, + ExecutionResult::Halt { reason } => Self::RejectedByVm { reason }, + _ => Self::Success { + tx_metrics: Box::new(ExecutionMetricsForCriteria::new(Some(tx), &res.tx_result)), + gas_remaining: res.tx_result.statistics.gas_remaining, + tx_result: res.tx_result, + compressed_bytecodes: res.compressed_bytecodes, + call_tracer_result: res.call_traces, + }, + } + } + + /// Returns a revert reason if either transaction was rejected or bootloader ran out of gas. + pub(super) fn err(&self) -> Option<&Halt> { + match self { + Self::Success { .. } => None, + Self::RejectedByVm { + reason: rejection_reason, + } => Some(rejection_reason), + Self::BootloaderOutOfGasForTx => Some(&Halt::BootloaderOutOfGas), + } + } +} diff --git a/core/node/state_keeper/src/batch_executor/tests/mod.rs b/core/node/state_keeper/src/executor/tests/mod.rs similarity index 92% rename from core/node/state_keeper/src/batch_executor/tests/mod.rs rename to core/node/state_keeper/src/executor/tests/mod.rs index ab9115991deb..90ce236a38f8 100644 --- a/core/node/state_keeper/src/batch_executor/tests/mod.rs +++ b/core/node/state_keeper/src/executor/tests/mod.rs @@ -1,35 +1,38 @@ +// FIXME: move storage-agnostic tests to VM executor crate + use assert_matches::assert_matches; use test_casing::{test_casing, Product}; use zksync_dal::{ConnectionPool, Core}; +use zksync_multivm::interface::{BatchTransactionExecutionResult, ExecutionResult, Halt}; use zksync_test_account::Account; use zksync_types::{ get_nonce_key, utils::storage_key_for_eth_balance, vm::FastVmMode, PriorityOpId, }; use self::tester::{AccountLoadNextExecutable, StorageSnapshot, TestConfig, Tester}; -use super::TxExecutionResult; mod read_storage_factory; mod tester; /// Ensures that the transaction was executed successfully. -fn assert_executed(execution_result: &TxExecutionResult) { - assert_matches!(execution_result, TxExecutionResult::Success { .. }); +fn assert_executed(execution_result: &BatchTransactionExecutionResult) { + let result = &execution_result.tx_result.result; + assert_matches!( + result, + ExecutionResult::Success { .. } | ExecutionResult::Revert { .. } + ); } /// Ensures that the transaction was rejected by the VM. -fn assert_rejected(execution_result: &TxExecutionResult) { - assert_matches!(execution_result, TxExecutionResult::RejectedByVm { .. }); +fn assert_rejected(execution_result: &BatchTransactionExecutionResult) { + let result = &execution_result.tx_result.result; + assert_matches!(result, ExecutionResult::Halt { reason } if !matches!(reason, Halt::BootloaderOutOfGas)); } /// Ensures that the transaction was executed successfully but reverted by the VM. -fn assert_reverted(execution_result: &TxExecutionResult) { - assert_executed(execution_result); - if let TxExecutionResult::Success { tx_result, .. } = execution_result { - assert!(tx_result.result.is_failed()); - } else { - unreachable!(); - } +fn assert_reverted(execution_result: &BatchTransactionExecutionResult) { + let result = &execution_result.tx_result.result; + assert_matches!(result, ExecutionResult::Revert { .. }); } #[derive(Debug, Clone, Copy)] @@ -189,23 +192,11 @@ async fn rollback(vm_mode: FastVmMode) { executor.rollback_last_tx().await.unwrap(); // Execute the same transaction, it must succeed. - let res_new = executor.execute_tx(tx).await.unwrap(); + let res_new = executor.execute_tx(tx.clone()).await.unwrap(); assert_executed(&res_new); - let ( - TxExecutionResult::Success { - tx_metrics: tx_metrics_old, - .. - }, - TxExecutionResult::Success { - tx_metrics: tx_metrics_new, - .. - }, - ) = (res_old, res_new) - else { - unreachable!(); - }; - + let tx_metrics_old = res_old.tx_result.get_execution_metrics(Some(&tx)); + let tx_metrics_new = res_new.tx_result.get_execution_metrics(Some(&tx)); assert_eq!( tx_metrics_old, tx_metrics_new, "Execution results must be the same" @@ -426,7 +417,12 @@ async fn bootloader_out_of_gas_for_any_tx(vm_mode: FastVmMode) { .await; let res = executor.execute_tx(alice.execute()).await.unwrap(); - assert_matches!(res, TxExecutionResult::BootloaderOutOfGasForTx); + assert_matches!( + res.tx_result.result, + ExecutionResult::Halt { + reason: Halt::BootloaderOutOfGas + } + ); } /// Checks that we can handle the bootloader out of gas error on tip phase. @@ -447,7 +443,7 @@ async fn bootloader_tip_out_of_gas() { let res = executor.execute_tx(alice.execute()).await.unwrap(); assert_executed(&res); - let finished_batch = executor.finish_batch().await.unwrap(); + let (finished_batch, _) = executor.finish_batch().await.unwrap(); // Just a bit below the gas used for the previous batch execution should be fine to execute the tx // but not enough to execute the block tip. @@ -469,7 +465,12 @@ async fn bootloader_tip_out_of_gas() { .await; let res = second_executor.execute_tx(alice.execute()).await.unwrap(); - assert_matches!(res, TxExecutionResult::BootloaderOutOfGasForTx); + assert_matches!( + res.tx_result.result, + ExecutionResult::Halt { + reason: Halt::BootloaderOutOfGas + } + ); } #[tokio::test] diff --git a/core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs b/core/node/state_keeper/src/executor/tests/read_storage_factory.rs similarity index 100% rename from core/node/state_keeper/src/batch_executor/tests/read_storage_factory.rs rename to core/node/state_keeper/src/executor/tests/read_storage_factory.rs diff --git a/core/node/state_keeper/src/batch_executor/tests/tester.rs b/core/node/state_keeper/src/executor/tests/tester.rs similarity index 95% rename from core/node/state_keeper/src/batch_executor/tests/tester.rs rename to core/node/state_keeper/src/executor/tests/tester.rs index e70c8b06fe0d..a00d9ca5ec15 100644 --- a/core/node/state_keeper/src/batch_executor/tests/tester.rs +++ b/core/node/state_keeper/src/executor/tests/tester.rs @@ -9,13 +9,16 @@ use zksync_config::configs::chain::StateKeeperConfig; use zksync_contracts::{get_loadnext_contract, test_contracts::LoadnextContractExecutionParams}; use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_multivm::{ - interface::{L1BatchEnv, L2BlockEnv, SystemEnv}, + interface::{ + executor::{BatchExecutor, BatchExecutorFactory}, + L1BatchEnv, L2BlockEnv, SystemEnv, + }, utils::StorageWritesDeduplicator, vm_latest::constants::INITIAL_STORAGE_WRITE_PUBDATA_BYTES, }; use zksync_node_genesis::{create_genesis_l1_batch, GenesisParams}; use zksync_node_test_utils::{recover, Snapshot}; -use zksync_state::{ReadStorageFactory, RocksdbStorageOptions}; +use zksync_state::{OwnedStorage, ReadStorageFactory, RocksdbStorageOptions}; use zksync_test_account::{Account, DeployContractsTx, TxType}; use zksync_types::{ block::L2BlockHasher, @@ -29,14 +32,14 @@ use zksync_types::{ StorageLog, Transaction, H256, L2_BASE_TOKEN_ADDRESS, U256, }; use zksync_utils::u256_to_h256; +use zksync_vm_executor::batch::MainBatchExecutorFactory; use super::{read_storage_factory::RocksdbStorageFactory, StorageType}; use crate::{ - batch_executor::{BatchExecutorHandle, TxExecutionResult}, testonly, testonly::BASE_SYSTEM_CONTRACTS, tests::{default_l1_batch_env, default_system_env}, - AsyncRocksdbCache, BatchExecutor, MainBatchExecutor, + AsyncRocksdbCache, }; /// Representation of configuration parameters used by the state keeper. @@ -97,7 +100,7 @@ impl Tester { pub(super) async fn create_batch_executor( &mut self, storage_type: StorageType, - ) -> BatchExecutorHandle { + ) -> Box> { let (l1_batch_env, system_env) = self.default_batch_params(); match storage_type { StorageType::AsyncRocksdbCache => { @@ -142,8 +145,8 @@ impl Tester { storage_factory: Arc, l1_batch_env: L1BatchEnv, system_env: SystemEnv, - ) -> BatchExecutorHandle { - let mut batch_executor = MainBatchExecutor::new(self.config.save_call_traces, false); + ) -> Box> { + let mut batch_executor = MainBatchExecutorFactory::new(self.config.save_call_traces, false); batch_executor.set_fast_vm_mode(self.config.fast_vm_mode); let (_stop_sender, stop_receiver) = watch::channel(false); @@ -158,7 +161,7 @@ impl Tester { pub(super) async fn recover_batch_executor( &mut self, snapshot: &SnapshotRecoveryStatus, - ) -> BatchExecutorHandle { + ) -> Box> { let (storage_factory, task) = AsyncRocksdbCache::new( self.pool(), self.state_keeper_db_path(), @@ -175,7 +178,7 @@ impl Tester { &mut self, storage_type: &StorageType, snapshot: &SnapshotRecoveryStatus, - ) -> BatchExecutorHandle { + ) -> Box> { match storage_type { StorageType::AsyncRocksdbCache => self.recover_batch_executor(snapshot).await, StorageType::Rocksdb => { @@ -199,7 +202,7 @@ impl Tester { &self, storage_factory: Arc, snapshot: &SnapshotRecoveryStatus, - ) -> BatchExecutorHandle { + ) -> Box> { let current_timestamp = snapshot.l2_block_timestamp + 1; let (mut l1_batch_env, system_env) = self.batch_params(snapshot.l1_batch_number + 1, current_timestamp); @@ -485,13 +488,10 @@ impl StorageSnapshot { let tx = alice.execute(); let tx_hash = tx.hash(); // probably incorrect let res = executor.execute_tx(tx).await.unwrap(); - if let TxExecutionResult::Success { tx_result, .. } = res { - let storage_logs = &tx_result.logs.storage_logs; - storage_writes_deduplicator - .apply(storage_logs.iter().filter(|log| log.log.is_write())); - } else { - panic!("Unexpected tx execution result: {res:?}"); - }; + assert!(!res.was_halted()); + let tx_result = res.tx_result; + let storage_logs = &tx_result.logs.storage_logs; + storage_writes_deduplicator.apply(storage_logs.iter().filter(|log| log.log.is_write())); let mut hasher = L2BlockHasher::new( L2BlockNumber(l2_block_env.number), @@ -506,7 +506,7 @@ impl StorageSnapshot { executor.start_next_l2_block(l2_block_env).await.unwrap(); } - let finished_batch = executor.finish_batch().await.unwrap(); + let (finished_batch, _) = executor.finish_batch().await.unwrap(); let storage_logs = &finished_batch.block_tip_execution_result.logs.storage_logs; storage_writes_deduplicator.apply(storage_logs.iter().filter(|log| log.log.is_write())); let modified_entries = storage_writes_deduplicator.into_modified_key_values(); diff --git a/core/node/state_keeper/src/io/common/tests.rs b/core/node/state_keeper/src/io/common/tests.rs index 4d2907e82913..9ea699234f8f 100644 --- a/core/node/state_keeper/src/io/common/tests.rs +++ b/core/node/state_keeper/src/io/common/tests.rs @@ -19,7 +19,7 @@ use zksync_types::{ block::L2BlockHasher, protocol_version::ProtocolSemanticVersion, L2ChainId, ProtocolVersion, ProtocolVersionId, }; -use zksync_vm_utils::storage::L1BatchParamsProvider; +use zksync_vm_executor::storage::L1BatchParamsProvider; use super::*; diff --git a/core/node/state_keeper/src/io/mempool.rs b/core/node/state_keeper/src/io/mempool.rs index c3d8dc1dee4d..5734977538bd 100644 --- a/core/node/state_keeper/src/io/mempool.rs +++ b/core/node/state_keeper/src/io/mempool.rs @@ -19,7 +19,7 @@ use zksync_types::{ }; // TODO (SMA-1206): use seconds instead of milliseconds. use zksync_utils::time::millis_since_epoch; -use zksync_vm_utils::storage::L1BatchParamsProvider; +use zksync_vm_executor::storage::L1BatchParamsProvider; use crate::{ io::{ diff --git a/core/node/state_keeper/src/io/mod.rs b/core/node/state_keeper/src/io/mod.rs index 384b0f45b0f6..f8106fd2423b 100644 --- a/core/node/state_keeper/src/io/mod.rs +++ b/core/node/state_keeper/src/io/mod.rs @@ -7,7 +7,7 @@ use zksync_types::{ block::L2BlockExecutionData, fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber, L2ChainId, ProtocolVersionId, Transaction, H256, }; -use zksync_vm_utils::storage::l1_batch_params; +use zksync_vm_executor::storage::l1_batch_params; pub use self::{ common::IoCursor, diff --git a/core/node/state_keeper/src/io/persistence.rs b/core/node/state_keeper/src/io/persistence.rs index 4dfb7400ffc6..24b1ffca631c 100644 --- a/core/node/state_keeper/src/io/persistence.rs +++ b/core/node/state_keeper/src/io/persistence.rs @@ -352,7 +352,7 @@ mod tests { use assert_matches::assert_matches; use futures::FutureExt; use zksync_dal::CoreDal; - use zksync_multivm::interface::VmExecutionMetrics; + use zksync_multivm::interface::{FinishedL1Batch, VmExecutionMetrics}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_types::{ api::TransactionStatus, block::BlockGasCount, writes::StateDiffRecord, L1BatchNumber, @@ -363,7 +363,6 @@ mod tests { use super::*; use crate::{ io::L2BlockParams, - testonly::default_vm_batch_result, tests::{ create_execution_result, create_transaction, create_updates_manager, default_l1_batch_env, default_system_env, Query, @@ -473,7 +472,7 @@ mod tests { virtual_blocks: 1, }); - let mut batch_result = default_vm_batch_result(); + let mut batch_result = FinishedL1Batch::mock(); batch_result.final_execution_state.deduplicated_storage_logs = storage_logs.iter().map(|log| log.log).collect(); batch_result.state_diffs = Some( diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index a610194ab9ca..02f7f92e070a 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -1,34 +1,34 @@ use std::{ convert::Infallible, - fmt, sync::Arc, time::{Duration, Instant}, }; use anyhow::Context as _; -use async_trait::async_trait; use tokio::sync::watch; use tracing::{info_span, Instrument}; use zksync_multivm::{ - interface::{Halt, L1BatchEnv, SystemEnv}, + interface::{ + executor::{BatchExecutor, BatchExecutorFactory}, + Halt, L1BatchEnv, SystemEnv, + }, utils::StorageWritesDeduplicator, }; -use zksync_state::ReadStorageFactory; +use zksync_state::{OwnedStorage, ReadStorageFactory}; use zksync_types::{ block::L2BlockExecutionData, l2::TransactionType, protocol_upgrade::ProtocolUpgradeTx, protocol_version::ProtocolVersionId, utils::display_timestamp, L1BatchNumber, Transaction, }; -use super::{ - batch_executor::{BatchExecutor, BatchExecutorHandle, TxExecutionResult}, +use crate::{ + executor::TxExecutionResult, io::{IoCursor, L1BatchParams, L2BlockParams, OutputHandler, PendingBatchData, StateKeeperIO}, metrics::{AGGREGATION_METRICS, KEEPER_METRICS, L1_BATCH_METRICS}, - seal_criteria::{ConditionalSealer, SealData, SealResolution}, + seal_criteria::{ConditionalSealer, SealData, SealResolution, UnexecutableReason}, types::ExecutionMetricsForCriteria, updates::UpdatesManager, utils::gas_count_from_writes, }; -use crate::seal_criteria::UnexecutableReason; /// Amount of time to block on waiting for some resource. The exact value is not really important, /// we only need it to not block on waiting indefinitely and be able to process cancellation requests. @@ -52,45 +52,6 @@ impl Error { } } -/// Functionality [`BatchExecutor`] + [`ReadStorageFactory`] with an erased storage type. This allows to keep -/// [`ZkSyncStateKeeper`] not parameterized by the storage type, simplifying its dependency injection and usage in tests. -#[async_trait] -trait ErasedBatchExecutor: fmt::Debug + Send { - async fn init_batch( - &mut self, - l1_batch_env: L1BatchEnv, - system_env: SystemEnv, - stop_receiver: &watch::Receiver, - ) -> Result; -} - -/// The only [`ErasedBatchExecutor`] implementation. -#[derive(Debug)] -struct ErasedBatchExecutorImpl { - batch_executor: Box>, - storage_factory: Arc>, -} - -#[async_trait] -impl ErasedBatchExecutor for ErasedBatchExecutorImpl { - async fn init_batch( - &mut self, - l1_batch_env: L1BatchEnv, - system_env: SystemEnv, - stop_receiver: &watch::Receiver, - ) -> Result { - let storage = self - .storage_factory - .access_storage(stop_receiver, l1_batch_env.number - 1) - .await - .context("failed creating VM storage")? - .ok_or(Error::Canceled)?; - Ok(self - .batch_executor - .init_batch(storage, l1_batch_env, system_env)) - } -} - /// State keeper represents a logic layer of L1 batch / L2 block processing flow. /// It's responsible for taking all the data from the `StateKeeperIO`, feeding it into `BatchExecutor` objects /// and calling `SealManager` to decide whether an L2 block or L1 batch should be sealed. @@ -105,28 +66,27 @@ pub struct ZkSyncStateKeeper { stop_receiver: watch::Receiver, io: Box, output_handler: OutputHandler, - batch_executor: Box, + batch_executor: Box>, sealer: Arc, + storage_factory: Arc, } impl ZkSyncStateKeeper { - pub fn new( + pub fn new( stop_receiver: watch::Receiver, sequencer: Box, - batch_executor: Box>, + batch_executor: Box>, output_handler: OutputHandler, sealer: Arc, - storage_factory: Arc>, + storage_factory: Arc, ) -> Self { Self { stop_receiver, io: sequencer, - batch_executor: Box::new(ErasedBatchExecutorImpl { - batch_executor, - storage_factory, - }), + batch_executor, output_handler, sealer, + storage_factory, } } @@ -190,21 +150,20 @@ impl ZkSyncStateKeeper { .await?; let mut batch_executor = self - .batch_executor - .init_batch( - l1_batch_env.clone(), - system_env.clone(), - &self.stop_receiver, - ) - .await?; - self.restore_state(&mut batch_executor, &mut updates_manager, pending_l2_blocks) + .create_batch_executor(l1_batch_env.clone(), system_env.clone()) .await?; + self.restore_state( + &mut *batch_executor, + &mut updates_manager, + pending_l2_blocks, + ) + .await?; let mut l1_batch_seal_delta: Option = None; while !self.is_canceled() { // This function will run until the batch can be sealed. self.process_l1_batch( - &mut batch_executor, + &mut *batch_executor, &mut updates_manager, protocol_upgrade_tx, ) @@ -220,12 +179,12 @@ impl ZkSyncStateKeeper { Self::start_next_l2_block( new_l2_block_params, &mut updates_manager, - &mut batch_executor, + &mut *batch_executor, ) .await?; } - let finished_batch = batch_executor.finish_batch().await?; + let (finished_batch, _) = batch_executor.finish_batch().await?; let sealed_batch_protocol_version = updates_manager.protocol_version(); updates_manager.finish_batch(finished_batch); let mut next_cursor = updates_manager.io_cursor(); @@ -244,12 +203,7 @@ impl ZkSyncStateKeeper { (system_env, l1_batch_env) = self.wait_for_new_batch_env(&next_cursor).await?; updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); batch_executor = self - .batch_executor - .init_batch( - l1_batch_env.clone(), - system_env.clone(), - &self.stop_receiver, - ) + .create_batch_executor(l1_batch_env.clone(), system_env.clone()) .await?; let version_changed = system_env.version != sealed_batch_protocol_version; @@ -262,6 +216,22 @@ impl ZkSyncStateKeeper { Err(Error::Canceled) } + async fn create_batch_executor( + &mut self, + l1_batch_env: L1BatchEnv, + system_env: SystemEnv, + ) -> Result>, Error> { + let storage = self + .storage_factory + .access_storage(&self.stop_receiver, l1_batch_env.number - 1) + .await + .context("failed creating VM storage")? + .ok_or(Error::Canceled)?; + Ok(self + .batch_executor + .init_batch(storage, l1_batch_env, system_env)) + } + /// This function is meant to be called only once during the state-keeper initialization. /// It will check if we should load a protocol upgrade or a `setChainId` transaction, /// perform some checks and return it. @@ -418,7 +388,7 @@ impl ZkSyncStateKeeper { async fn start_next_l2_block( params: L2BlockParams, updates_manager: &mut UpdatesManager, - batch_executor: &mut BatchExecutorHandle, + batch_executor: &mut dyn BatchExecutor, ) -> anyhow::Result<()> { updates_manager.push_l2_block(params); let block_env = updates_manager.l2_block.get_env(); @@ -460,7 +430,7 @@ impl ZkSyncStateKeeper { )] async fn restore_state( &mut self, - batch_executor: &mut BatchExecutorHandle, + batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, l2_blocks_to_reexecute: Vec, ) -> Result<(), Error> { @@ -491,6 +461,7 @@ impl ZkSyncStateKeeper { .execute_tx(tx.clone()) .await .with_context(|| format!("failed re-executing transaction {:?}", tx.hash()))?; + let result = TxExecutionResult::new(result, &tx); let TxExecutionResult::Success { tx_result, @@ -564,7 +535,7 @@ impl ZkSyncStateKeeper { )] async fn process_l1_batch( &mut self, - batch_executor: &mut BatchExecutorHandle, + batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: Option, ) -> Result<(), Error> { @@ -692,7 +663,7 @@ impl ZkSyncStateKeeper { async fn process_upgrade_tx( &mut self, - batch_executor: &mut BatchExecutorHandle, + batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: ProtocolUpgradeTx, ) -> anyhow::Result<()> { @@ -759,7 +730,7 @@ impl ZkSyncStateKeeper { #[tracing::instrument(skip_all)] async fn process_one_tx( &mut self, - batch_executor: &mut BatchExecutorHandle, + batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, tx: Transaction, ) -> anyhow::Result<(SealResolution, TxExecutionResult)> { @@ -768,6 +739,7 @@ impl ZkSyncStateKeeper { .execute_tx(tx.clone()) .await .with_context(|| format!("failed executing transaction {:?}", tx.hash()))?; + let exec_result = TxExecutionResult::new(exec_result, &tx); latency.observe(); let latency = KEEPER_METRICS.determine_seal_resolution.start(); diff --git a/core/node/state_keeper/src/lib.rs b/core/node/state_keeper/src/lib.rs index 1c12f7825486..c12e4163fdd4 100644 --- a/core/node/state_keeper/src/lib.rs +++ b/core/node/state_keeper/src/lib.rs @@ -1,18 +1,4 @@ -use std::sync::Arc; - -use tokio::sync::watch; -use zksync_config::configs::{ - chain::{MempoolConfig, StateKeeperConfig}, - wallets, -}; -use zksync_dal::{ConnectionPool, Core}; -use zksync_node_fee_model::BatchFeeModelInputProvider; -use zksync_types::L2ChainId; - pub use self::{ - batch_executor::{ - main_executor::MainBatchExecutor, BatchExecutor, BatchExecutorHandle, TxExecutionResult, - }, io::{ mempool::MempoolIO, L2BlockParams, L2BlockSealerTask, OutputHandler, StateKeeperIO, StateKeeperOutputHandler, StateKeeperPersistence, TreeWritesPersistence, @@ -25,7 +11,7 @@ pub use self::{ updates::UpdatesManager, }; -mod batch_executor; +pub mod executor; pub mod io; mod keeper; mod mempool_actor; @@ -38,41 +24,3 @@ pub(crate) mod tests; pub(crate) mod types; pub mod updates; pub(crate) mod utils; - -#[allow(clippy::too_many_arguments)] -pub async fn create_state_keeper( - state_keeper_config: StateKeeperConfig, - wallets: wallets::StateKeeper, - async_cache: AsyncRocksdbCache, - l2chain_id: L2ChainId, - mempool_config: &MempoolConfig, - pool: ConnectionPool, - mempool: MempoolGuard, - batch_fee_input_provider: Arc, - output_handler: OutputHandler, - stop_receiver: watch::Receiver, -) -> ZkSyncStateKeeper { - let batch_executor_base = MainBatchExecutor::new(state_keeper_config.save_call_traces, false); - - let io = MempoolIO::new( - mempool, - batch_fee_input_provider, - pool, - &state_keeper_config, - wallets.fee_account.address(), - mempool_config.delay_interval(), - l2chain_id, - ) - .expect("Failed initializing main node I/O for state keeper"); - - let sealer = SequencerSealer::new(state_keeper_config); - - ZkSyncStateKeeper::new( - stop_receiver, - Box::new(io), - Box::new(batch_executor_base), - output_handler, - Arc::new(sealer), - Arc::new(async_cache), - ) -} diff --git a/core/node/state_keeper/src/metrics.rs b/core/node/state_keeper/src/metrics.rs index 1bf314d1b91e..7da5babd2199 100644 --- a/core/node/state_keeper/src/metrics.rs +++ b/core/node/state_keeper/src/metrics.rs @@ -10,10 +10,7 @@ use vise::{ Metrics, }; use zksync_mempool::MempoolStore; -use zksync_multivm::interface::{ - DeduplicatedWritesMetrics, VmExecutionResultAndLogs, VmRevertReason, -}; -use zksync_shared_metrics::InteractionType; +use zksync_multivm::interface::{DeduplicatedWritesMetrics, VmRevertReason}; use zksync_types::ProtocolVersionId; use super::seal_criteria::SealResolution; @@ -84,13 +81,6 @@ pub struct StateKeeperMetrics { /// The time it takes for transactions to be included in a block. Representative of the time user must wait before their transaction is confirmed. #[metrics(buckets = INCLUSION_DELAY_BUCKETS)] pub transaction_inclusion_delay: Family>, - /// Time spent by the state keeper on transaction execution. - #[metrics(buckets = Buckets::LATENCIES)] - pub tx_execution_time: Family>, - /// Number of times gas price was reported as too high. - pub gas_price_too_high: Counter, - /// Number of times blob base fee was reported as too high. - pub blob_base_fee_too_high: Counter, /// The time it takes to match seal resolution for each tx. #[metrics(buckets = Buckets::LATENCIES)] pub match_seal_resolution: Histogram, @@ -439,52 +429,9 @@ impl SealProgress<'_> { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] -#[metrics(label = "command", rename_all = "snake_case")] -pub(super) enum ExecutorCommand { - ExecuteTx, - #[metrics(name = "start_next_miniblock")] - StartNextL2Block, - RollbackLastTx, - FinishBatch, - FinishBatchWithCache, -} - -const GAS_PER_NANOSECOND_BUCKETS: Buckets = Buckets::values(&[ - 0.01, 0.03, 0.1, 0.3, 0.5, 0.75, 1., 1.5, 3., 5., 10., 20., 50., -]); - -/// Executor-related state keeper metrics. -#[derive(Debug, Metrics)] -#[metrics(prefix = "state_keeper")] -pub(super) struct ExecutorMetrics { - /// Latency to process a single command sent to the batch executor. - #[metrics(buckets = Buckets::LATENCIES)] - pub batch_executor_command_response_time: Family>, - /// Cumulative latency of interacting with the storage when executing a transaction - /// in the batch executor. - #[metrics(buckets = Buckets::LATENCIES)] - pub batch_storage_interaction_duration: Family>, - #[metrics(buckets = GAS_PER_NANOSECOND_BUCKETS)] - pub computational_gas_per_nanosecond: Histogram, - #[metrics(buckets = GAS_PER_NANOSECOND_BUCKETS)] - pub failed_tx_gas_limit_per_nanosecond: Histogram, -} - -#[vise::register] -pub(super) static EXECUTOR_METRICS: vise::Global = vise::Global::new(); - #[derive(Debug, Metrics)] #[metrics(prefix = "batch_tip")] pub(crate) struct BatchTipMetrics { - #[metrics(buckets = Buckets::exponential(60000.0..=80000000.0, 2.0))] - gas_used: Histogram, - #[metrics(buckets = Buckets::exponential(1.0..=60000.0, 2.0))] - pubdata_published: Histogram, - #[metrics(buckets = Buckets::exponential(1.0..=4096.0, 2.0))] - circuit_statistic: Histogram, - #[metrics(buckets = Buckets::exponential(1.0..=4096.0, 2.0))] - execution_metrics_size: Histogram, #[metrics(buckets = Buckets::exponential(1.0..=60000.0, 2.0))] block_writes_metrics_positive_size: Histogram, #[metrics(buckets = Buckets::exponential(1.0..=60000.0, 2.0))] @@ -492,17 +439,6 @@ pub(crate) struct BatchTipMetrics { } impl BatchTipMetrics { - pub fn observe(&self, execution_result: &VmExecutionResultAndLogs) { - self.gas_used - .observe(execution_result.statistics.gas_used as usize); - self.pubdata_published - .observe(execution_result.statistics.pubdata_published as usize); - self.circuit_statistic - .observe(execution_result.statistics.circuit_statistic.total()); - self.execution_metrics_size - .observe(execution_result.get_execution_metrics(None).size()); - } - pub fn observe_writes_metrics( &self, initial_writes_metrics: &DeduplicatedWritesMetrics, diff --git a/core/node/state_keeper/src/testonly/mod.rs b/core/node/state_keeper/src/testonly/mod.rs index d17261a3a0f7..0ce8c06be0e7 100644 --- a/core/node/state_keeper/src/testonly/mod.rs +++ b/core/node/state_keeper/src/testonly/mod.rs @@ -1,14 +1,17 @@ //! Test utilities that can be used for testing sequencer that may //! be useful outside of this crate. +use async_trait::async_trait; use once_cell::sync::Lazy; -use tokio::sync::mpsc; use zksync_contracts::BaseSystemContracts; use zksync_dal::{ConnectionPool, Core, CoreDal as _}; use zksync_multivm::interface::{ - storage::StorageViewCache, CurrentExecutionState, ExecutionResult, FinishedL1Batch, L1BatchEnv, - Refunds, SystemEnv, VmExecutionLogs, VmExecutionResultAndLogs, VmExecutionStatistics, + executor::{BatchExecutor, BatchExecutorFactory}, + storage::{InMemoryStorage, StorageView}, + BatchTransactionExecutionResult, ExecutionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, + SystemEnv, VmExecutionResultAndLogs, }; +use zksync_state::OwnedStorage; use zksync_test_account::Account; use zksync_types::{ fee::Fee, utils::storage_key_for_standard_token_balance, AccountTreeId, Address, Execute, @@ -17,94 +20,62 @@ use zksync_types::{ }; use zksync_utils::u256_to_h256; -use crate::{ - batch_executor::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}, - types::ExecutionMetricsForCriteria, -}; - pub mod test_batch_executor; pub(super) static BASE_SYSTEM_CONTRACTS: Lazy = Lazy::new(BaseSystemContracts::load_from_disk); -pub(super) fn default_vm_batch_result() -> FinishedL1Batch { - FinishedL1Batch { - block_tip_execution_result: VmExecutionResultAndLogs { - result: ExecutionResult::Success { output: vec![] }, - logs: VmExecutionLogs::default(), - statistics: VmExecutionStatistics::default(), - refunds: Refunds::default(), - }, - final_execution_state: CurrentExecutionState { - events: vec![], - deduplicated_storage_logs: vec![], - used_contract_hashes: vec![], - user_l2_to_l1_logs: vec![], - system_logs: vec![], - storage_refunds: Vec::new(), - pubdata_costs: Vec::new(), - }, - final_bootloader_memory: Some(vec![]), - pubdata_input: Some(vec![]), - state_diffs: Some(vec![]), - } -} - /// Creates a `TxExecutionResult` object denoting a successful tx execution. -pub(crate) fn successful_exec() -> TxExecutionResult { - TxExecutionResult::Success { +pub(crate) fn successful_exec() -> BatchTransactionExecutionResult { + BatchTransactionExecutionResult { tx_result: Box::new(VmExecutionResultAndLogs { result: ExecutionResult::Success { output: vec![] }, logs: Default::default(), statistics: Default::default(), refunds: Default::default(), }), - tx_metrics: Box::new(ExecutionMetricsForCriteria { - l1_gas: Default::default(), - execution_metrics: Default::default(), - }), compressed_bytecodes: vec![], - call_tracer_result: vec![], - gas_remaining: Default::default(), + call_traces: vec![], } } -pub(crate) fn storage_view_cache() -> StorageViewCache { - StorageViewCache::default() -} - /// `BatchExecutor` which doesn't check anything at all. Accepts all transactions. #[derive(Debug)] pub struct MockBatchExecutor; -impl BatchExecutor<()> for MockBatchExecutor { +impl BatchExecutorFactory for MockBatchExecutor { fn init_batch( &mut self, - _storage: (), - _l1batch_params: L1BatchEnv, + _storage: OwnedStorage, + _l1_batch_env: L1BatchEnv, _system_env: SystemEnv, - ) -> BatchExecutorHandle { - let (send, recv) = mpsc::channel(1); - let handle = tokio::task::spawn(async { - let mut recv = recv; - while let Some(cmd) = recv.recv().await { - match cmd { - Command::ExecuteTx(_, resp) => resp.send(successful_exec()).unwrap(), - Command::StartNextL2Block(_, resp) => resp.send(()).unwrap(), - Command::RollbackLastTx(_) => panic!("unexpected rollback"), - Command::FinishBatch(resp) => { - // Blanket result, it doesn't really matter. - resp.send(default_vm_batch_result()).unwrap(); - break; - } - Command::FinishBatchWithCache(resp) => resp - .send((default_vm_batch_result(), storage_view_cache())) - .unwrap(), - } - } - anyhow::Ok(()) - }); - BatchExecutorHandle::from_raw(handle, send) + ) -> Box> { + Box::new(Self) + } +} + +#[async_trait] +impl BatchExecutor for MockBatchExecutor { + async fn execute_tx( + &mut self, + _tx: Transaction, + ) -> anyhow::Result { + Ok(successful_exec()) + } + + async fn rollback_last_tx(&mut self) -> anyhow::Result<()> { + panic!("unexpected rollback"); + } + + async fn start_next_l2_block(&mut self, _env: L2BlockEnv) -> anyhow::Result<()> { + Ok(()) + } + + async fn finish_batch( + self: Box, + ) -> anyhow::Result<(FinishedL1Batch, StorageView)> { + let storage = OwnedStorage::boxed(InMemoryStorage::default()); + Ok((FinishedL1Batch::mock(), StorageView::new(storage))) } } diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index d8ee36990a1c..ffca8dff8643 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -13,27 +13,28 @@ use std::{ }; use async_trait::async_trait; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use zksync_contracts::BaseSystemContracts; use zksync_multivm::{ - interface::{ExecutionResult, L1BatchEnv, SystemEnv, VmExecutionResultAndLogs}, + interface::{ + executor::{BatchExecutor, BatchExecutorFactory}, + storage::InMemoryStorage, + BatchTransactionExecutionResult, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, + L2BlockEnv, SystemEnv, VmExecutionLogs, VmExecutionResultAndLogs, + }, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; use zksync_node_test_utils::create_l2_transaction; -use zksync_state::ReadStorageFactory; +use zksync_state::{interface::StorageView, OwnedStorage, ReadStorageFactory}; use zksync_types::{ - fee_model::BatchFeeInput, protocol_upgrade::ProtocolUpgradeTx, Address, L1BatchNumber, - L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, + fee_model::BatchFeeInput, l2_to_l1_log::UserL2ToL1Log, protocol_upgrade::ProtocolUpgradeTx, + Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, }; use crate::{ - batch_executor::{BatchExecutor, BatchExecutorHandle, Command, TxExecutionResult}, io::{IoCursor, L1BatchParams, L2BlockParams, PendingBatchData, StateKeeperIO}, seal_criteria::{IoSealCriteria, SequencerSealer, UnexecutableReason}, - testonly::{ - default_vm_batch_result, storage_view_cache, successful_exec, BASE_SYSTEM_CONTRACTS, - }, - types::ExecutionMetricsForCriteria, + testonly::{successful_exec, BASE_SYSTEM_CONTRACTS}, updates::UpdatesManager, OutputHandler, StateKeeperOutputHandler, ZkSyncStateKeeper, }; @@ -110,7 +111,7 @@ impl TestScenario { mut self, description: &'static str, tx: Transaction, - result: TxExecutionResult, + result: BatchTransactionExecutionResult, ) -> Self { self.actions .push_back(ScenarioItem::Tx(description, tx, result)); @@ -198,13 +199,13 @@ impl TestScenario { pub(crate) async fn run(self, sealer: SequencerSealer) { assert!(!self.actions.is_empty(), "Test scenario can't be empty"); - let batch_executor_base = TestBatchExecutorBuilder::new(&self); + let batch_executor = TestBatchExecutorBuilder::new(&self); let (stop_sender, stop_receiver) = watch::channel(false); let (io, output_handler) = TestIO::new(stop_sender, self); let state_keeper = ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - Box::new(batch_executor_base), + Box::new(batch_executor), output_handler, Arc::new(sealer), Arc::new(MockReadStorageFactory), @@ -253,27 +254,33 @@ pub(crate) fn random_upgrade_tx(tx_number: u64) -> ProtocolUpgradeTx { } /// Creates a `TxExecutionResult` object denoting a successful tx execution with the given execution metrics. -pub(crate) fn successful_exec_with_metrics( - tx_metrics: ExecutionMetricsForCriteria, -) -> TxExecutionResult { - TxExecutionResult::Success { +pub(crate) fn successful_exec_with_log() -> BatchTransactionExecutionResult { + BatchTransactionExecutionResult { tx_result: Box::new(VmExecutionResultAndLogs { result: ExecutionResult::Success { output: vec![] }, - logs: Default::default(), + logs: VmExecutionLogs { + user_l2_to_l1_logs: vec![UserL2ToL1Log::default()], + ..VmExecutionLogs::default() + }, statistics: Default::default(), refunds: Default::default(), }), - tx_metrics: Box::new(tx_metrics), compressed_bytecodes: vec![], - call_tracer_result: vec![], - gas_remaining: Default::default(), + call_traces: vec![], } } /// Creates a `TxExecutionResult` object denoting a tx that was rejected. -pub(crate) fn rejected_exec() -> TxExecutionResult { - TxExecutionResult::RejectedByVm { - reason: zksync_multivm::interface::Halt::InnerTxError, +pub(crate) fn rejected_exec(reason: Halt) -> BatchTransactionExecutionResult { + BatchTransactionExecutionResult { + tx_result: Box::new(VmExecutionResultAndLogs { + result: ExecutionResult::Halt { reason }, + logs: Default::default(), + statistics: Default::default(), + refunds: Default::default(), + }), + compressed_bytecodes: vec![], + call_traces: vec![], } } @@ -283,7 +290,7 @@ enum ScenarioItem { NoTxsUntilNextAction(&'static str), /// Increments protocol version in IO state. IncrementProtocolVersion(&'static str), - Tx(&'static str, Transaction, TxExecutionResult), + Tx(&'static str, Transaction, BatchTransactionExecutionResult), Rollback(&'static str, Transaction), Reject(&'static str, Transaction, UnexecutableReason), L2BlockSeal( @@ -332,7 +339,7 @@ impl fmt::Debug for ScenarioItem { } } -type ExpectedTransactions = VecDeque>>; +type ExpectedTransactions = VecDeque>>; #[derive(Debug, Default)] pub struct TestBatchExecutorBuilder { @@ -348,7 +355,7 @@ pub struct TestBatchExecutorBuilder { impl TestBatchExecutorBuilder { pub(crate) fn new(scenario: &TestScenario) -> Self { let mut txs = VecDeque::new(); - let mut batch_txs = HashMap::new(); + let mut batch_txs = HashMap::<_, VecDeque>::new(); let mut rollback_set = HashSet::new(); // Insert data about the pending batch, if it exists. @@ -369,9 +376,7 @@ impl TestBatchExecutorBuilder { ScenarioItem::Tx(_, tx, result) => { batch_txs .entry(tx.hash()) - .and_modify(|txs: &mut VecDeque| { - txs.push_back(result.clone()) - }) + .and_modify(|txs| txs.push_back(result.clone())) .or_insert_with(|| { let mut txs = VecDeque::with_capacity(1); txs.push_back(result.clone()); @@ -410,34 +415,24 @@ impl TestBatchExecutorBuilder { } } -impl BatchExecutor<()> for TestBatchExecutorBuilder { +impl BatchExecutorFactory for TestBatchExecutorBuilder { fn init_batch( &mut self, - _storage: (), - _l1_batch_params: L1BatchEnv, + _storage: OwnedStorage, + _l1_batch_env: L1BatchEnv, _system_env: SystemEnv, - ) -> BatchExecutorHandle { - let (commands_sender, commands_receiver) = mpsc::channel(1); - - let executor = TestBatchExecutor::new( - commands_receiver, - self.txs.pop_front().unwrap(), - self.rollback_set.clone(), - ); - let handle = tokio::task::spawn_blocking(move || { - executor.run(); - Ok(()) - }); - BatchExecutorHandle::from_raw(handle, commands_sender) + ) -> Box> { + let executor = + TestBatchExecutor::new(self.txs.pop_front().unwrap(), self.rollback_set.clone()); + Box::new(executor) } } #[derive(Debug)] pub(super) struct TestBatchExecutor { - commands: mpsc::Receiver, /// Mapping tx -> response. /// The same transaction can be executed several times, so we use a sequence of responses and consume them by one. - txs: HashMap>, + txs: HashMap>, /// Set of transactions that are expected to be rolled back. rollback_set: HashSet, /// Last executed tx hash. @@ -446,64 +441,63 @@ pub(super) struct TestBatchExecutor { impl TestBatchExecutor { pub(super) fn new( - commands: mpsc::Receiver, - txs: HashMap>, + txs: HashMap>, rollback_set: HashSet, ) -> Self { Self { - commands, txs, rollback_set, last_tx: H256::default(), // We don't expect rollbacks until the first tx is executed. } } +} - pub(super) fn run(mut self) { - while let Some(cmd) = self.commands.blocking_recv() { - match cmd { - Command::ExecuteTx(tx, resp) => { - let result = self - .txs - .get_mut(&tx.hash()) - .unwrap() - .pop_front() - .unwrap_or_else(|| { - panic!( - "Received a request to execute an unknown transaction: {:?}", - tx - ) - }); - resp.send(result).unwrap(); - self.last_tx = tx.hash(); - } - Command::StartNextL2Block(_, resp) => { - resp.send(()).unwrap(); - } - Command::RollbackLastTx(resp) => { - // This is an additional safety check: IO would check that every rollback is included in the - // test scenario, but here we want to additionally check that each such request goes to the - // the batch executor as well. - if !self.rollback_set.contains(&self.last_tx) { - // Request to rollback an unexpected tx. - panic!( - "Received a request to rollback an unexpected tx. Last executed tx: {:?}", - self.last_tx - ) - } - resp.send(()).unwrap(); - // It's OK to not update `last_executed_tx`, since state keeper never should rollback more than 1 - // tx in a row, and it's going to cause a panic anyway. - } - Command::FinishBatch(resp) => { - // Blanket result, it doesn't really matter. - resp.send(default_vm_batch_result()).unwrap(); - return; - } - Command::FinishBatchWithCache(resp) => resp - .send((default_vm_batch_result(), storage_view_cache())) - .unwrap(), - } +#[async_trait] +impl BatchExecutor for TestBatchExecutor { + async fn execute_tx( + &mut self, + tx: Transaction, + ) -> anyhow::Result { + let result = self + .txs + .get_mut(&tx.hash()) + .unwrap() + .pop_front() + .unwrap_or_else(|| { + panic!( + "Received a request to execute an unknown transaction: {:?}", + tx + ) + }); + self.last_tx = tx.hash(); + Ok(result) + } + + async fn rollback_last_tx(&mut self) -> anyhow::Result<()> { + // This is an additional safety check: IO would check that every rollback is included in the + // test scenario, but here we want to additionally check that each such request goes to the + // the batch executor as well. + if !self.rollback_set.contains(&self.last_tx) { + // Request to rollback an unexpected tx. + panic!( + "Received a request to rollback an unexpected tx. Last executed tx: {:?}", + self.last_tx + ) } + // It's OK to not update `last_executed_tx`, since state keeper never should rollback more than 1 + // tx in a row, and it's going to cause a panic anyway. + Ok(()) + } + + async fn start_next_l2_block(&mut self, _env: L2BlockEnv) -> anyhow::Result<()> { + Ok(()) + } + + async fn finish_batch( + self: Box, + ) -> anyhow::Result<(FinishedL1Batch, StorageView)> { + let storage = OwnedStorage::boxed(InMemoryStorage::default()); + Ok((FinishedL1Batch::mock(), StorageView::new(storage))) } } @@ -809,12 +803,13 @@ impl StateKeeperIO for TestIO { pub struct MockReadStorageFactory; #[async_trait] -impl ReadStorageFactory<()> for MockReadStorageFactory { +impl ReadStorageFactory for MockReadStorageFactory { async fn access_storage( &self, _stop_receiver: &watch::Receiver, _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - Ok(Some(())) + ) -> anyhow::Result> { + let storage = InMemoryStorage::default(); + Ok(Some(OwnedStorage::boxed(storage))) } } diff --git a/core/node/state_keeper/src/tests/mod.rs b/core/node/state_keeper/src/tests/mod.rs index e9a0a57c6977..80de0f0beff9 100644 --- a/core/node/state_keeper/src/tests/mod.rs +++ b/core/node/state_keeper/src/tests/mod.rs @@ -11,7 +11,7 @@ use zksync_config::configs::chain::StateKeeperConfig; use zksync_multivm::{ interface::{ ExecutionResult, Halt, L1BatchEnv, L2BlockEnv, Refunds, SystemEnv, TxExecutionMode, - VmExecutionLogs, VmExecutionMetrics, VmExecutionResultAndLogs, VmExecutionStatistics, + VmExecutionLogs, VmExecutionResultAndLogs, VmExecutionStatistics, }, vm_latest::constants::BATCH_COMPUTATIONAL_GAS_LIMIT, }; @@ -27,7 +27,6 @@ use zksync_types::{ use zksync_utils::u256_to_h256; use crate::{ - batch_executor::TxExecutionResult, io::PendingBatchData, keeper::POLL_WAIT_DURATION, seal_criteria::{ @@ -37,14 +36,13 @@ use crate::{ testonly::{ successful_exec, test_batch_executor::{ - random_tx, random_upgrade_tx, rejected_exec, successful_exec_with_metrics, + random_tx, random_upgrade_tx, rejected_exec, successful_exec_with_log, MockReadStorageFactory, TestBatchExecutorBuilder, TestIO, TestScenario, FEE_ACCOUNT, }, BASE_SYSTEM_CONTRACTS, }, - types::ExecutionMetricsForCriteria, updates::UpdatesManager, - utils::l1_batch_base_cost, + utils::{gas_count_from_tx_and_metrics, l1_batch_base_cost}, ZkSyncStateKeeper, }; @@ -194,29 +192,28 @@ async fn sealed_by_number_of_txs() { #[tokio::test] async fn sealed_by_gas() { + let first_tx = random_tx(1); + let execution_result = successful_exec_with_log(); + let exec_metrics = execution_result + .tx_result + .get_execution_metrics(Some(&first_tx)); + assert!(exec_metrics.size() > 0); + let l1_gas_per_tx = gas_count_from_tx_and_metrics(&first_tx, &exec_metrics); + assert!(l1_gas_per_tx.commit > 0); + let config = StateKeeperConfig { - max_single_tx_gas: 62_002, + max_single_tx_gas: 62_000 + l1_gas_per_tx.commit * 2, reject_tx_at_gas_percentage: 1.0, close_block_at_gas_percentage: 0.5, ..StateKeeperConfig::default() }; let sealer = SequencerSealer::with_sealers(config, vec![Box::new(GasCriterion)]); - let l1_gas_per_tx = BlockGasCount { - commit: 1, // Both txs together with `block_base_cost` would bring it over the block `31_001` commit bound. - prove: 0, - execute: 0, - }; - let execution_result = successful_exec_with_metrics(ExecutionMetricsForCriteria { - l1_gas: l1_gas_per_tx, - execution_metrics: VmExecutionMetrics::default(), - }); - TestScenario::new() .seal_l2_block_when(|updates| { updates.l2_block.executed_transactions.len() == 1 }) - .next_tx("First tx", random_tx(1), execution_result.clone()) + .next_tx("First tx", first_tx, execution_result.clone()) .l2_block_sealed_with("L2 block with a single tx", move |updates| { assert_eq!( updates.l2_block.l1_gas_count, @@ -226,11 +223,11 @@ async fn sealed_by_gas() { }) .next_tx("Second tx", random_tx(1), execution_result) .l2_block_sealed("L2 block 2") - .batch_sealed_with("Batch sealed with both txs", |updates| { + .batch_sealed_with("Batch sealed with both txs", move |updates| { assert_eq!( updates.l1_batch.l1_gas_count, BlockGasCount { - commit: l1_batch_base_cost(AggregatedActionType::Commit) + 2, + commit: l1_batch_base_cost(AggregatedActionType::Commit) + l1_gas_per_tx.commit * 2, prove: l1_batch_base_cost(AggregatedActionType::PublishProofOnchain), execute: l1_batch_base_cost(AggregatedActionType::Execute), }, @@ -254,14 +251,7 @@ async fn sealed_by_gas_then_by_num_tx() { vec![Box::new(GasCriterion), Box::new(SlotsCriterion)], ); - let execution_result = successful_exec_with_metrics(ExecutionMetricsForCriteria { - l1_gas: BlockGasCount { - commit: 1, - prove: 0, - execute: 0, - }, - execution_metrics: VmExecutionMetrics::default(), - }); + let execution_result = successful_exec_with_log(); // 1st tx is sealed by gas sealer; 2nd, 3rd, & 4th are sealed by slots sealer. TestScenario::new() @@ -316,7 +306,11 @@ async fn rejected_tx() { let rejected_tx = random_tx(1); TestScenario::new() .seal_l2_block_when(|updates| updates.l2_block.executed_transactions.len() == 1) - .next_tx("Rejected tx", rejected_tx.clone(), rejected_exec()) + .next_tx( + "Rejected tx", + rejected_tx.clone(), + rejected_exec(Halt::InnerTxError), + ) .tx_rejected( "Tx got rejected", rejected_tx, @@ -349,7 +343,7 @@ async fn bootloader_tip_out_of_gas_flow() { .next_tx( "Tx -> Bootloader tip out of gas", bootloader_out_of_gas_tx.clone(), - TxExecutionResult::BootloaderOutOfGasForTx, + rejected_exec(Halt::BootloaderOutOfGas), ) .tx_rollback( "Last tx rolled back to seal the block", @@ -424,7 +418,7 @@ async fn pending_batch_is_applied() { async fn load_upgrade_tx() { let sealer = SequencerSealer::default(); let scenario = TestScenario::new(); - let batch_executor_base = TestBatchExecutorBuilder::new(&scenario); + let batch_executor = TestBatchExecutorBuilder::new(&scenario); let (stop_sender, stop_receiver) = watch::channel(false); let (mut io, output_handler) = TestIO::new(stop_sender, scenario); @@ -434,7 +428,7 @@ async fn load_upgrade_tx() { let mut sk = ZkSyncStateKeeper::new( stop_receiver, Box::new(io), - Box::new(batch_executor_base), + Box::new(batch_executor), output_handler, Arc::new(sealer), Arc::new(MockReadStorageFactory), diff --git a/core/node/tee_verifier_input_producer/Cargo.toml b/core/node/tee_verifier_input_producer/Cargo.toml index c975bbcd280a..7a5a4de5d0c9 100644 --- a/core/node/tee_verifier_input_producer/Cargo.toml +++ b/core/node/tee_verifier_input_producer/Cargo.toml @@ -18,7 +18,7 @@ zksync_queued_job_processor.workspace = true zksync_tee_verifier.workspace = true zksync_types.workspace = true zksync_utils.workspace = true -zksync_vm_utils.workspace = true +zksync_vm_executor.workspace = true vise.workspace = true anyhow.workspace = true diff --git a/core/node/tee_verifier_input_producer/src/lib.rs b/core/node/tee_verifier_input_producer/src/lib.rs index abd70542a42f..08382903ad6d 100644 --- a/core/node/tee_verifier_input_producer/src/lib.rs +++ b/core/node/tee_verifier_input_producer/src/lib.rs @@ -21,7 +21,7 @@ use zksync_queued_job_processor::JobProcessor; use zksync_tee_verifier::Verify; use zksync_types::{tee_types::TeeType, L1BatchNumber, L2ChainId}; use zksync_utils::u256_to_h256; -use zksync_vm_utils::storage::L1BatchParamsProvider; +use zksync_vm_executor::storage::L1BatchParamsProvider; use self::metrics::METRICS; diff --git a/core/node/vm_runner/Cargo.toml b/core/node/vm_runner/Cargo.toml index 565b33c0c347..ceb11a982477 100644 --- a/core/node/vm_runner/Cargo.toml +++ b/core/node/vm_runner/Cargo.toml @@ -11,17 +11,16 @@ keywords.workspace = true categories.workspace = true [dependencies] -zksync_multivm.workspace = true +zksync_vm_interface.workspace = true zksync_types.workspace = true zksync_dal.workspace = true zksync_contracts.workspace = true zksync_state.workspace = true zksync_storage.workspace = true -zksync_state_keeper.workspace = true zksync_utils.workspace = true zksync_prover_interface.workspace = true zksync_object_store.workspace = true -zksync_vm_utils.workspace = true +zksync_vm_executor.workspace = true zksync_health_check.workspace = true serde.workspace = true diff --git a/core/node/vm_runner/src/impls/bwip.rs b/core/node/vm_runner/src/impls/bwip.rs index f7f8c099609f..f23f63533ff5 100644 --- a/core/node/vm_runner/src/impls/bwip.rs +++ b/core/node/vm_runner/src/impls/bwip.rs @@ -6,16 +6,18 @@ use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_object_store::ObjectStore; use zksync_prover_interface::inputs::VMRunWitnessInputData; -use zksync_state_keeper::{BatchExecutor, StateKeeperOutputHandler, UpdatesManager}; +use zksync_state::OwnedStorage; use zksync_types::{ block::StorageOracleInfo, witness_block_state::WitnessStorageState, L1BatchNumber, L2ChainId, H256, }; use zksync_utils::{bytes_to_chunks, h256_to_u256, u256_to_h256}; +use zksync_vm_interface::{executor::BatchExecutorFactory, L1BatchEnv, L2BlockEnv, SystemEnv}; use crate::{ storage::StorageSyncTask, ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, - OutputHandlerFactory, VmRunner, VmRunnerIo, VmRunnerStorage, + L1BatchOutput, L2BlockOutput, OutputHandler, OutputHandlerFactory, VmRunner, VmRunnerIo, + VmRunnerStorage, }; /// A standalone component that retrieves all needed data for basic witness generation and saves it to the bucket @@ -30,7 +32,7 @@ impl BasicWitnessInputProducer { pub async fn new( pool: ConnectionPool, object_store: Arc, - batch_executor: Box, + batch_executor_factory: Box>, rocksdb_path: String, chain_id: L2ChainId, first_processed_batch: L1BatchNumber, @@ -53,7 +55,7 @@ impl BasicWitnessInputProducer { Box::new(io), Arc::new(loader), Box::new(output_handler_factory), - batch_executor, + batch_executor_factory, ); Ok(( Self { vm_runner }, @@ -145,30 +147,38 @@ impl VmRunnerIo for BasicWitnessInputProducerIo { struct BasicWitnessInputProducerOutputHandler { pool: ConnectionPool, object_store: Arc, + system_env: SystemEnv, + l1_batch_number: L1BatchNumber, } #[async_trait] -impl StateKeeperOutputHandler for BasicWitnessInputProducerOutputHandler { - async fn handle_l2_block(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> { +impl OutputHandler for BasicWitnessInputProducerOutputHandler { + async fn handle_l2_block( + &mut self, + _env: L2BlockEnv, + _output: &L2BlockOutput, + ) -> anyhow::Result<()> { Ok(()) } #[tracing::instrument( name = "BasicWitnessInputProducerOutputHandler::handle_l1_batch", skip_all, - fields(l1_batch = %updates_manager.l1_batch.number) + fields(l1_batch = %self.l1_batch_number) )] - async fn handle_l1_batch( - &mut self, - updates_manager: Arc, - ) -> anyhow::Result<()> { - let l1_batch_number = updates_manager.l1_batch.number; + async fn handle_l1_batch(self: Box, output: Arc) -> anyhow::Result<()> { + let l1_batch_number = self.l1_batch_number; let mut connection = self.pool.connection().await?; tracing::info!(%l1_batch_number, "Started saving VM run data"); - let result = - get_updates_manager_witness_input_data(&mut connection, updates_manager).await?; + let result = get_updates_manager_witness_input_data( + &mut connection, + &self.system_env, + l1_batch_number, + &output, + ) + .await?; assert_database_witness_input_data(&mut connection, l1_batch_number, &result).await; @@ -193,18 +203,13 @@ impl StateKeeperOutputHandler for BasicWitnessInputProducerOutputHandler { #[tracing::instrument(skip_all)] async fn get_updates_manager_witness_input_data( connection: &mut Connection<'_, Core>, - updates_manager: Arc, + system_env: &SystemEnv, + l1_batch_number: L1BatchNumber, + output: &L1BatchOutput, ) -> anyhow::Result { - let l1_batch_number = updates_manager.l1_batch.number; - let finished_batch = updates_manager - .l1_batch - .finished - .clone() - .ok_or_else(|| anyhow!("L1 batch {l1_batch_number:?} is not finished"))?; - - let initial_heap_content = finished_batch.final_bootloader_memory.unwrap(); // might be just empty - let default_aa = updates_manager.base_system_contract_hashes().default_aa; - let bootloader = updates_manager.base_system_contract_hashes().bootloader; + let initial_heap_content = output.batch.final_bootloader_memory.clone().unwrap(); // might be just empty + let default_aa = system_env.base_system_smart_contracts.hashes().default_aa; + let bootloader = system_env.base_system_smart_contracts.hashes().bootloader; let bootloader_code_bytes = connection .factory_deps_dal() .get_sealed_factory_dep(bootloader) @@ -220,9 +225,8 @@ async fn get_updates_manager_witness_input_data( .ok_or_else(|| anyhow!("Default account bytecode should exist"))?; let account_bytecode = bytes_to_chunks(&account_bytecode_bytes); - let hashes: HashSet = finished_batch - .final_execution_state - .used_contract_hashes + let used_contract_hashes = &output.batch.final_execution_state.used_contract_hashes; + let hashes: HashSet = used_contract_hashes .iter() // SMA-1555: remove this hack once updated to the latest version of `zkevm_test_harness` .filter(|&&hash| hash != h256_to_u256(bootloader)) @@ -232,33 +236,22 @@ async fn get_updates_manager_witness_input_data( .factory_deps_dal() .get_factory_deps(&hashes) .await; - if finished_batch - .final_execution_state - .used_contract_hashes - .contains(&account_code_hash) - { + if used_contract_hashes.contains(&account_code_hash) { used_bytecodes.insert(account_code_hash, account_bytecode); } - let storage_refunds = finished_batch.final_execution_state.storage_refunds; - let pubdata_costs = finished_batch.final_execution_state.pubdata_costs; - - let storage_view_cache = updates_manager - .storage_view_cache() - .expect("Storage view cache was not initialized"); - + let storage_refunds = output.batch.final_execution_state.storage_refunds.clone(); + let pubdata_costs = output.batch.final_execution_state.pubdata_costs.clone(); let witness_block_state = WitnessStorageState { - read_storage_key: storage_view_cache.read_storage_keys(), - is_write_initial: storage_view_cache.initial_writes(), + read_storage_key: output.storage_view_cache.read_storage_keys(), + is_write_initial: output.storage_view_cache.initial_writes(), }; Ok(VMRunWitnessInputData { l1_batch_number, used_bytecodes, initial_heap_content, - - protocol_version: updates_manager.protocol_version(), - + protocol_version: system_env.version, bootloader_code, default_account_code_hash: account_code_hash, storage_refunds, @@ -389,11 +382,14 @@ struct BasicWitnessInputProducerOutputHandlerFactory { impl OutputHandlerFactory for BasicWitnessInputProducerOutputHandlerFactory { async fn create_handler( &mut self, - _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + ) -> anyhow::Result> { Ok(Box::new(BasicWitnessInputProducerOutputHandler { pool: self.pool.clone(), object_store: self.object_store.clone(), + system_env, + l1_batch_number: l1_batch_env.number, })) } } diff --git a/core/node/vm_runner/src/impls/playground.rs b/core/node/vm_runner/src/impls/playground.rs index 461d36116096..091fa15fc953 100644 --- a/core/node/vm_runner/src/impls/playground.rs +++ b/core/node/vm_runner/src/impls/playground.rs @@ -15,13 +15,15 @@ use tokio::{ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_state::RocksdbStorage; -use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; use zksync_types::{vm::FastVmMode, L1BatchNumber, L2ChainId}; +use zksync_vm_executor::batch::MainBatchExecutorFactory; +use zksync_vm_interface::{L1BatchEnv, L2BlockEnv, SystemEnv}; use crate::{ storage::{PostgresLoader, StorageLoader}, - ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, - StorageSyncTask, VmRunner, VmRunnerIo, VmRunnerStorage, + ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, L1BatchOutput, + L2BlockOutput, OutputHandler, OutputHandlerFactory, StorageSyncTask, VmRunner, VmRunnerIo, + VmRunnerStorage, }; #[derive(Debug, Serialize)] @@ -80,7 +82,7 @@ enum VmPlaygroundStorage { #[derive(Debug)] pub struct VmPlayground { pool: ConnectionPool, - batch_executor: MainBatchExecutor, + batch_executor_factory: MainBatchExecutorFactory, storage: VmPlaygroundStorage, chain_id: L2ChainId, io: VmPlaygroundIo, @@ -125,8 +127,8 @@ impl VmPlayground { latest_processed_batch.unwrap_or(cursor.first_processed_batch) }; - let mut batch_executor = MainBatchExecutor::new(false, false); - batch_executor.set_fast_vm_mode(vm_mode); + let mut batch_executor_factory = MainBatchExecutorFactory::new(false, false); + batch_executor_factory.set_fast_vm_mode(vm_mode); let io = VmPlaygroundIo { cursor_file_path, @@ -157,7 +159,7 @@ impl VmPlayground { }; let this = Self { pool, - batch_executor, + batch_executor_factory, storage, chain_id, io, @@ -247,7 +249,7 @@ impl VmPlayground { Box::new(self.io), loader, Box::new(self.output_handler_factory), - Box::new(self.batch_executor), + Box::new(self.batch_executor_factory), ); vm_runner.run(&stop_receiver).await } @@ -392,9 +394,17 @@ impl VmRunnerIo for VmPlaygroundIo { struct VmPlaygroundOutputHandler; #[async_trait] -impl StateKeeperOutputHandler for VmPlaygroundOutputHandler { - async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { - tracing::trace!("Processed L2 block #{}", updates_manager.l2_block.number); +impl OutputHandler for VmPlaygroundOutputHandler { + async fn handle_l2_block( + &mut self, + env: L2BlockEnv, + _output: &L2BlockOutput, + ) -> anyhow::Result<()> { + tracing::trace!("Processed L2 block #{}", env.number); + Ok(()) + } + + async fn handle_l1_batch(self: Box, _output: Arc) -> anyhow::Result<()> { Ok(()) } } @@ -403,8 +413,9 @@ impl StateKeeperOutputHandler for VmPlaygroundOutputHandler { impl OutputHandlerFactory for VmPlaygroundOutputHandler { async fn create_handler( &mut self, - _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + _system_env: SystemEnv, + _l1_batch_env: L1BatchEnv, + ) -> anyhow::Result> { Ok(Box::new(Self)) } } diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index dfd5251fd39b..b620675b78e2 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -1,15 +1,16 @@ use std::sync::Arc; -use anyhow::Context; use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_state_keeper::{MainBatchExecutor, StateKeeperOutputHandler, UpdatesManager}; use zksync_types::{L1BatchNumber, L2ChainId, StorageLog}; +use zksync_vm_executor::batch::MainBatchExecutorFactory; +use zksync_vm_interface::{L1BatchEnv, L2BlockEnv, SystemEnv}; use crate::{ storage::StorageSyncTask, ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, - OutputHandlerFactory, VmRunner, VmRunnerIo, VmRunnerStorage, + L1BatchOutput, L2BlockOutput, OutputHandler, OutputHandlerFactory, VmRunner, VmRunnerIo, + VmRunnerStorage, }; /// A standalone component that writes protective reads asynchronously to state keeper. @@ -37,7 +38,7 @@ impl ProtectiveReadsWriter { let output_handler_factory = ProtectiveReadsOutputHandlerFactory { pool: pool.clone() }; let (output_handler_factory, output_handler_factory_task) = ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), output_handler_factory); - let batch_processor = MainBatchExecutor::new(false, false); + let batch_processor = MainBatchExecutorFactory::new(false, false); let vm_runner = VmRunner::new( pool, Box::new(io), @@ -133,30 +134,29 @@ impl VmRunnerIo for ProtectiveReadsIo { #[derive(Debug)] struct ProtectiveReadsOutputHandler { + l1_batch_number: L1BatchNumber, pool: ConnectionPool, } #[async_trait] -impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { - async fn handle_l2_block(&mut self, _updates_manager: &UpdatesManager) -> anyhow::Result<()> { +impl OutputHandler for ProtectiveReadsOutputHandler { + async fn handle_l2_block( + &mut self, + _env: L2BlockEnv, + _output: &L2BlockOutput, + ) -> anyhow::Result<()> { Ok(()) } #[tracing::instrument( name = "ProtectiveReadsOutputHandler::handle_l1_batch", skip_all, - fields(l1_batch = %updates_manager.l1_batch.number) + fields(l1_batch = %self.l1_batch_number) )] - async fn handle_l1_batch( - &mut self, - updates_manager: Arc, - ) -> anyhow::Result<()> { - let finished_batch = updates_manager - .l1_batch - .finished - .as_ref() - .context("L1 batch is not actually finished")?; - let (_, computed_protective_reads): (Vec, Vec) = finished_batch + async fn handle_l1_batch(self: Box, output: Arc) -> anyhow::Result<()> { + let l1_batch_number = self.l1_batch_number; + let (_, computed_protective_reads): (Vec, Vec) = output + .batch .final_execution_state .deduplicated_storage_logs .iter() @@ -168,12 +168,12 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { .await?; let mut written_protective_reads = connection .storage_logs_dedup_dal() - .get_protective_reads_for_l1_batch(updates_manager.l1_batch.number) + .get_protective_reads_for_l1_batch(l1_batch_number) .await?; if !written_protective_reads.is_empty() { tracing::debug!( - l1_batch_number = %updates_manager.l1_batch.number, + l1_batch_number = %l1_batch_number, "Protective reads have already been written, validating" ); for protective_read in computed_protective_reads { @@ -181,7 +181,7 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { let key = protective_read.key.key(); if !written_protective_reads.remove(&protective_read.key) { tracing::error!( - l1_batch_number = %updates_manager.l1_batch.number, + l1_batch_number = %l1_batch_number, address = %address, key = %key, "VM runner produced a protective read that did not happen in state keeper" @@ -190,7 +190,7 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { } for remaining_read in written_protective_reads { tracing::error!( - l1_batch_number = %updates_manager.l1_batch.number, + l1_batch_number = %l1_batch_number, address = %remaining_read.address(), key = %remaining_read.key(), "State keeper produced a protective read that did not happen in VM runner" @@ -198,15 +198,12 @@ impl StateKeeperOutputHandler for ProtectiveReadsOutputHandler { } } else { tracing::debug!( - l1_batch_number = %updates_manager.l1_batch.number, + l1_batch_number = %l1_batch_number, "Protective reads have not been written, writing" ); connection .storage_logs_dedup_dal() - .insert_protective_reads( - updates_manager.l1_batch.number, - &computed_protective_reads, - ) + .insert_protective_reads(l1_batch_number, &computed_protective_reads) .await?; } @@ -223,10 +220,12 @@ struct ProtectiveReadsOutputHandlerFactory { impl OutputHandlerFactory for ProtectiveReadsOutputHandlerFactory { async fn create_handler( &mut self, - _l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + _system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + ) -> anyhow::Result> { Ok(Box::new(ProtectiveReadsOutputHandler { pool: self.pool.clone(), + l1_batch_number: l1_batch_env.number, })) } } diff --git a/core/node/vm_runner/src/lib.rs b/core/node/vm_runner/src/lib.rs index 03e3f43baedc..63e2b5881aaf 100644 --- a/core/node/vm_runner/src/lib.rs +++ b/core/node/vm_runner/src/lib.rs @@ -16,7 +16,8 @@ mod tests; pub use self::{ io::VmRunnerIo, output_handler::{ - ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, OutputHandlerFactory, + ConcurrentOutputHandlerFactory, ConcurrentOutputHandlerFactoryTask, L1BatchOutput, + L2BlockOutput, OutputHandler, OutputHandlerFactory, }, process::VmRunner, storage::{BatchExecuteData, StorageSyncTask, VmRunnerStorage}, diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 4052c245a44f..25eae5e36845 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -1,9 +1,4 @@ -use std::{ - fmt::{Debug, Formatter}, - mem, - sync::Arc, - time::Duration, -}; +use std::{fmt, sync::Arc, time::Duration}; use anyhow::Context; use async_trait::async_trait; @@ -13,13 +8,52 @@ use tokio::{ task::JoinHandle, }; use zksync_dal::{ConnectionPool, Core}; -use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; -use zksync_types::L1BatchNumber; +use zksync_state::interface::StorageViewCache; +use zksync_types::{L1BatchNumber, Transaction}; +use zksync_vm_interface::{ + BatchTransactionExecutionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, +}; use crate::{metrics::METRICS, VmRunnerIo}; type BatchReceiver = oneshot::Receiver>>; +/// Output from executing a single L2 block. +#[derive(Debug, Default)] +pub struct L2BlockOutput { + /// Executed transactions together with execution results. + pub transactions: Vec<(Transaction, BatchTransactionExecutionResult)>, +} + +impl L2BlockOutput { + pub(crate) fn push(&mut self, tx: Transaction, exec_result: BatchTransactionExecutionResult) { + self.transactions.push((tx, exec_result)); + } +} + +/// Output from executing L1 batch tip. +#[derive(Debug)] +pub struct L1BatchOutput { + /// Finished L1 batch. + pub batch: FinishedL1Batch, + /// Information about storage accesses for the batch. + pub storage_view_cache: StorageViewCache, +} + +/// Handler of batch execution. +#[async_trait] +pub trait OutputHandler: fmt::Debug + Send { + /// Handles an L2 block processed by the VM. + async fn handle_l2_block( + &mut self, + env: L2BlockEnv, + output: &L2BlockOutput, + ) -> anyhow::Result<()>; + + /// Handles an L1 batch processed by the VM. + async fn handle_l1_batch(self: Box, output: Arc) -> anyhow::Result<()>; +} + /// Functionality to produce a [`StateKeeperOutputHandler`] implementation for a specific L1 batch. /// /// The idea behind this trait is that often handling output data is independent of the order of the @@ -27,7 +61,7 @@ type BatchReceiver = oneshot::Receiver>>; /// simultaneously. Implementing this trait signifies that this property is held for the data the /// implementation is responsible for. #[async_trait] -pub trait OutputHandlerFactory: Debug + Send { +pub trait OutputHandlerFactory: fmt::Debug + Send { /// Creates a [`StateKeeperOutputHandler`] implementation for the provided L1 batch. Only /// supposed to be used for the L1 batch data it was created against. Using it for anything else /// will lead to errors. @@ -37,8 +71,9 @@ pub trait OutputHandlerFactory: Debug + Send { /// Propagates DB errors. async fn create_handler( &mut self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>; + system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + ) -> anyhow::Result>; } /// A delegator factory that requires an underlying factory `F` that does the actual work, however @@ -57,8 +92,12 @@ pub struct ConcurrentOutputHandlerFactory Debug for ConcurrentOutputHandlerFactory { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for ConcurrentOutputHandlerFactory +where + Io: VmRunnerIo, + F: OutputHandlerFactory, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ConcurrentOutputHandlerFactory") .field("pool", &self.pool) .field("io", &self.io) @@ -101,8 +140,10 @@ impl OutputHandlerFactory { async fn create_handler( &mut self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + ) -> anyhow::Result> { + let l1_batch_number = l1_batch_env.number; let mut conn = self.pool.connection_tagged(self.io.name()).await?; let latest_processed_batch = self.io.latest_processed_batch(&mut conn).await?; let last_processable_batch = self.io.last_ready_to_be_loaded_batch(&mut conn).await?; @@ -121,70 +162,50 @@ impl OutputHandlerFactory last_processable_batch ); - let handler = self.factory.create_handler(l1_batch_number).await?; + let handler = self + .factory + .create_handler(system_env, l1_batch_env) + .await?; let (sender, receiver) = oneshot::channel(); self.state.insert(l1_batch_number, receiver); - Ok(Box::new(AsyncOutputHandler::Running { handler, sender })) + Ok(Box::new(AsyncOutputHandler { handler, sender })) } } -enum AsyncOutputHandler { - Running { - handler: Box, - sender: oneshot::Sender>>, - }, - Finished, +struct AsyncOutputHandler { + handler: Box, + sender: oneshot::Sender>>, } -impl Debug for AsyncOutputHandler { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - AsyncOutputHandler::Running { handler, .. } => f - .debug_struct("AsyncOutputHandler::Running") - .field("handler", handler) - .finish(), - AsyncOutputHandler::Finished => f.debug_struct("AsyncOutputHandler::Finished").finish(), - } +impl fmt::Debug for AsyncOutputHandler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AsyncOutputHandler::Running") + .field("handler", &self.handler) + .finish() } } #[async_trait] -impl StateKeeperOutputHandler for AsyncOutputHandler { - async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { - match self { - AsyncOutputHandler::Running { handler, .. } => { - handler.handle_l2_block(updates_manager).await - } - AsyncOutputHandler::Finished => { - Err(anyhow::anyhow!("Cannot handle any more L2 blocks")) - } - } - } - - async fn handle_l1_batch( +impl OutputHandler for AsyncOutputHandler { + async fn handle_l2_block( &mut self, - updates_manager: Arc, + env: L2BlockEnv, + output: &L2BlockOutput, ) -> anyhow::Result<()> { - let state = mem::replace(self, AsyncOutputHandler::Finished); - match state { - AsyncOutputHandler::Running { - mut handler, - sender, - } => { - sender - .send(tokio::task::spawn(async move { - let latency = METRICS.output_handle_time.start(); - let result = handler.handle_l1_batch(updates_manager).await; - latency.observe(); - result - })) - .ok(); - Ok(()) - } - AsyncOutputHandler::Finished => { - Err(anyhow::anyhow!("Cannot handle any more L1 batches")) - } - } + self.handler.handle_l2_block(env, output).await + } + + async fn handle_l1_batch(self: Box, output: Arc) -> anyhow::Result<()> { + let handler = self.handler; + self.sender + .send(tokio::task::spawn(async move { + let latency = METRICS.output_handle_time.start(); + let result = handler.handle_l1_batch(output).await; + latency.observe(); + result + })) + .ok(); + Ok(()) } } @@ -196,8 +217,8 @@ pub struct ConcurrentOutputHandlerFactoryTask { state: Arc>, } -impl Debug for ConcurrentOutputHandlerFactoryTask { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for ConcurrentOutputHandlerFactoryTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ConcurrentOutputHandlerFactoryTask") .field("pool", &self.pool) .field("io", &self.io) diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index 3c5a00e074c0..e2a678ccdce4 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -3,14 +3,17 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context; use tokio::{sync::watch, task::JoinHandle}; use zksync_dal::{ConnectionPool, Core}; -use zksync_multivm::interface::L2BlockEnv; -use zksync_state_keeper::{ - BatchExecutor, BatchExecutorHandle, ExecutionMetricsForCriteria, L2BlockParams, - StateKeeperOutputHandler, TxExecutionResult, UpdatesManager, -}; +use zksync_state::OwnedStorage; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber}; +use zksync_vm_interface::{ + executor::{BatchExecutor, BatchExecutorFactory}, + L2BlockEnv, +}; -use crate::{metrics::METRICS, storage::StorageLoader, OutputHandlerFactory, VmRunnerIo}; +use crate::{ + metrics::METRICS, output_handler::OutputHandler, storage::StorageLoader, L1BatchOutput, + L2BlockOutput, OutputHandlerFactory, VmRunnerIo, +}; /// VM runner represents a logic layer of L1 batch / L2 block processing flow akin to that of state /// keeper. The difference is that VM runner is designed to be run on batches/blocks that have @@ -29,7 +32,7 @@ pub struct VmRunner { io: Box, loader: Arc, output_handler_factory: Box, - batch_processor: Box, + batch_executor_factory: Box>, } impl VmRunner { @@ -44,32 +47,27 @@ impl VmRunner { io: Box, loader: Arc, output_handler_factory: Box, - batch_processor: Box, + batch_executor_factory: Box>, ) -> Self { Self { pool, io, loader, output_handler_factory, - batch_processor, + batch_executor_factory, } } async fn process_batch( - mut batch_executor: BatchExecutorHandle, + mut batch_executor: Box>, l2_blocks: Vec, - mut updates_manager: UpdatesManager, - mut output_handler: Box, + mut output_handler: Box, ) -> anyhow::Result<()> { let latency = METRICS.run_vm_time.start(); for (i, l2_block) in l2_blocks.into_iter().enumerate() { + let block_env = L2BlockEnv::from_l2_block_data(&l2_block); if i > 0 { // First L2 block in every batch is already preloaded - updates_manager.push_l2_block(L2BlockParams { - timestamp: l2_block.timestamp, - virtual_blocks: l2_block.virtual_blocks, - }); - let block_env = L2BlockEnv::from_l2_block_data(&l2_block); batch_executor .start_next_l2_block(block_env) .await @@ -77,51 +75,36 @@ impl VmRunner { format!("failed starting L2 block with {block_env:?} in batch executor") })?; } + + let mut block_output = L2BlockOutput::default(); for tx in l2_block.txs { let exec_result = batch_executor .execute_tx(tx.clone()) .await .with_context(|| format!("failed executing transaction {:?}", tx.hash()))?; - let TxExecutionResult::Success { - tx_result, - tx_metrics, - call_tracer_result, - compressed_bytecodes, - .. - } = exec_result - else { - anyhow::bail!("Unexpected non-successful transaction"); - }; - let ExecutionMetricsForCriteria { - l1_gas: tx_l1_gas_this_tx, - execution_metrics: tx_execution_metrics, - } = *tx_metrics; - updates_manager.extend_from_executed_transaction( - tx, - *tx_result, - compressed_bytecodes, - tx_l1_gas_this_tx, - tx_execution_metrics, - call_tracer_result, + anyhow::ensure!( + !exec_result.was_halted(), + "Unexpected non-successful transaction" ); + block_output.push(tx, exec_result); } output_handler - .handle_l2_block(&updates_manager) + .handle_l2_block(block_env, &block_output) .await .context("VM runner failed to handle L2 block")?; } - let (finished_batch, storage_view_cache) = batch_executor - .finish_batch_with_cache() + let (batch, storage_view) = batch_executor + .finish_batch() .await - .context("Failed getting storage view cache")?; - updates_manager.finish_batch(finished_batch); - // this is needed for Basic Witness Input Producer to use in memory reads, but not database queries - updates_manager.update_storage_view_cache(storage_view_cache); - + .context("VM runner failed to execute batch tip")?; + let output = L1BatchOutput { + batch, + storage_view_cache: storage_view.cache(), + }; latency.observe(); output_handler - .handle_l1_batch(Arc::new(updates_manager)) + .handle_l1_batch(Arc::new(output)) .await .context("VM runner failed to handle L1 batch")?; Ok(()) @@ -178,16 +161,14 @@ impl VmRunner { tokio::time::sleep(SLEEP_INTERVAL).await; continue; }; - let updates_manager = - UpdatesManager::new(&batch_data.l1_batch_env, &batch_data.system_env); - let batch_executor = self.batch_processor.init_batch( + let batch_executor = self.batch_executor_factory.init_batch( storage, - batch_data.l1_batch_env, - batch_data.system_env, + batch_data.l1_batch_env.clone(), + batch_data.system_env.clone(), ); let output_handler = self .output_handler_factory - .create_handler(next_batch) + .create_handler(batch_data.system_env, batch_data.l1_batch_env) .await?; self.io @@ -196,7 +177,6 @@ impl VmRunner { let handle = tokio::task::spawn(Self::process_batch( batch_executor, batch_data.l2_blocks, - updates_manager, output_handler, )); task_handles.push((next_batch, handle)); diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index d08ef2830f3f..baee426007c5 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -9,13 +9,13 @@ use anyhow::Context as _; use async_trait::async_trait; use tokio::sync::{watch, RwLock}; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_multivm::interface::{L1BatchEnv, SystemEnv}; use zksync_state::{ AsyncCatchupTask, BatchDiff, OwnedStorage, RocksdbCell, RocksdbStorage, RocksdbStorageBuilder, RocksdbWithMemory, }; use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2ChainId}; -use zksync_vm_utils::storage::L1BatchParamsProvider; +use zksync_vm_executor::storage::L1BatchParamsProvider; +use zksync_vm_interface::{L1BatchEnv, SystemEnv}; use crate::{metrics::METRICS, VmRunnerIo}; diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 525a306eabf5..530016408140 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -4,13 +4,11 @@ use async_trait::async_trait; use rand::{prelude::SliceRandom, Rng}; use tokio::sync::RwLock; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_multivm::interface::TransactionExecutionMetrics; use zksync_node_genesis::GenesisParams; use zksync_node_test_utils::{ create_l1_batch_metadata, create_l2_block, execute_l2_transaction, l1_batch_metadata_to_commitment_artifacts, }; -use zksync_state_keeper::{StateKeeperOutputHandler, UpdatesManager}; use zksync_test_account::Account; use zksync_types::{ block::{L1BatchHeader, L2BlockHasher}, @@ -22,8 +20,9 @@ use zksync_types::{ StorageLog, StorageLogKind, StorageValue, H160, H256, L2_BASE_TOKEN_ADDRESS, U256, }; use zksync_utils::{bytecode::hash_bytecode, h256_to_u256, u256_to_h256}; +use zksync_vm_interface::{L1BatchEnv, L2BlockEnv, SystemEnv, TransactionExecutionMetrics}; -use super::{OutputHandlerFactory, VmRunnerIo}; +use super::*; mod output_handler; mod playground; @@ -155,25 +154,27 @@ struct TestOutputFactory { impl OutputHandlerFactory for TestOutputFactory { async fn create_handler( &mut self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { + _system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + ) -> anyhow::Result> { #[derive(Debug)] struct TestOutputHandler { delay: Option, } #[async_trait] - impl StateKeeperOutputHandler for TestOutputHandler { + impl OutputHandler for TestOutputHandler { async fn handle_l2_block( &mut self, - _updates_manager: &UpdatesManager, + _env: L2BlockEnv, + _output: &L2BlockOutput, ) -> anyhow::Result<()> { Ok(()) } async fn handle_l1_batch( - &mut self, - _updates_manager: Arc, + self: Box, + _output: Arc, ) -> anyhow::Result<()> { if let Some(delay) = self.delay { tokio::time::sleep(delay).await @@ -182,7 +183,7 @@ impl OutputHandlerFactory for TestOutputFactory { } } - let delay = self.delays.get(&l1_batch_number).copied(); + let delay = self.delays.get(&l1_batch_env.number).copied(); Ok(Box::new(TestOutputHandler { delay })) } } diff --git a/core/node/vm_runner/src/tests/output_handler.rs b/core/node/vm_runner/src/tests/output_handler.rs index 453507328c4f..1bf30effdbe5 100644 --- a/core/node/vm_runner/src/tests/output_handler.rs +++ b/core/node/vm_runner/src/tests/output_handler.rs @@ -6,13 +6,13 @@ use tokio::{ }; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; use zksync_dal::{ConnectionPool, Core}; -use zksync_multivm::interface::{L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode}; -use zksync_state_keeper::UpdatesManager; +use zksync_state::interface::StorageViewCache; use zksync_types::L1BatchNumber; +use zksync_vm_interface::{FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode}; use crate::{ tests::{wait, IoMock, TestOutputFactory}, - ConcurrentOutputHandlerFactory, OutputHandlerFactory, + ConcurrentOutputHandlerFactory, L1BatchOutput, L2BlockOutput, OutputHandlerFactory, }; struct OutputHandlerTester { @@ -40,47 +40,53 @@ impl OutputHandlerTester { } async fn spawn_test_task(&mut self, l1_batch_number: L1BatchNumber) -> anyhow::Result<()> { - let mut output_handler = self.output_factory.create_handler(l1_batch_number).await?; - let join_handle = tokio::task::spawn(async move { - let l1_batch_env = L1BatchEnv { - previous_batch_hash: None, - number: Default::default(), + let l1_batch_env = L1BatchEnv { + previous_batch_hash: None, + number: l1_batch_number, + timestamp: 0, + fee_input: Default::default(), + fee_account: Default::default(), + enforced_base_fee: None, + first_l2_block: L2BlockEnv { + number: 0, timestamp: 0, - fee_input: Default::default(), - fee_account: Default::default(), - enforced_base_fee: None, - first_l2_block: L2BlockEnv { - number: 0, - timestamp: 0, - prev_block_hash: Default::default(), - max_virtual_blocks_to_create: 0, + prev_block_hash: Default::default(), + max_virtual_blocks_to_create: 0, + }, + }; + let system_env = SystemEnv { + zk_porter_available: false, + version: Default::default(), + base_system_smart_contracts: BaseSystemContracts { + bootloader: SystemContractCode { + code: vec![], + hash: Default::default(), }, - }; - let system_env = SystemEnv { - zk_porter_available: false, - version: Default::default(), - base_system_smart_contracts: BaseSystemContracts { - bootloader: SystemContractCode { - code: vec![], - hash: Default::default(), - }, - default_aa: SystemContractCode { - code: vec![], - hash: Default::default(), - }, + default_aa: SystemContractCode { + code: vec![], + hash: Default::default(), }, - bootloader_gas_limit: 0, - execution_mode: TxExecutionMode::VerifyExecute, - default_validation_computational_gas_limit: 0, - chain_id: Default::default(), - }; - let updates_manager = UpdatesManager::new(&l1_batch_env, &system_env); + }, + bootloader_gas_limit: 0, + execution_mode: TxExecutionMode::VerifyExecute, + default_validation_computational_gas_limit: 0, + chain_id: Default::default(), + }; + + let mut output_handler = self + .output_factory + .create_handler(system_env, l1_batch_env.clone()) + .await?; + let join_handle = tokio::task::spawn(async move { output_handler - .handle_l2_block(&updates_manager) + .handle_l2_block(l1_batch_env.first_l2_block, &L2BlockOutput::default()) .await .unwrap(); output_handler - .handle_l1_batch(Arc::new(updates_manager)) + .handle_l1_batch(Arc::new(L1BatchOutput { + batch: FinishedL1Batch::mock(), + storage_view_cache: StorageViewCache::default(), + })) .await .unwrap(); }); diff --git a/core/node/vm_runner/src/tests/process.rs b/core/node/vm_runner/src/tests/process.rs index 2ac976021e0b..fec3fd2ba60a 100644 --- a/core/node/vm_runner/src/tests/process.rs +++ b/core/node/vm_runner/src/tests/process.rs @@ -5,9 +5,9 @@ use test_casing::test_casing; use tokio::sync::{watch, RwLock}; use zksync_dal::{ConnectionPool, Core}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; -use zksync_state_keeper::MainBatchExecutor; use zksync_test_account::Account; use zksync_types::{L1BatchNumber, L2ChainId}; +use zksync_vm_executor::batch::MainBatchExecutorFactory; use super::*; use crate::{ConcurrentOutputHandlerFactory, VmRunner, VmRunnerStorage}; @@ -54,7 +54,7 @@ async fn process_batches((batch_count, window): (u32, u32)) -> anyhow::Result<() tokio::task::spawn(async move { task.run(output_stop_receiver).await.unwrap() }); let storage = Arc::new(storage); - let batch_executor = MainBatchExecutor::new(false, false); + let batch_executor = MainBatchExecutorFactory::new(false, false); let vm_runner = VmRunner::new( connection_pool, Box::new(io.clone()), diff --git a/core/node/vm_runner/src/tests/storage_writer.rs b/core/node/vm_runner/src/tests/storage_writer.rs index 6cad2da6974a..76d0867125a8 100644 --- a/core/node/vm_runner/src/tests/storage_writer.rs +++ b/core/node/vm_runner/src/tests/storage_writer.rs @@ -3,17 +3,18 @@ use test_casing::test_casing; use tokio::sync::watch; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; use zksync_state::OwnedStorage; -use zksync_state_keeper::MainBatchExecutor; -use zksync_types::L2ChainId; +use zksync_types::{L2ChainId, StorageLogWithPreviousValue}; +use zksync_vm_executor::batch::MainBatchExecutorFactory; use super::*; use crate::{ storage::{PostgresLoader, StorageLoader}, - ConcurrentOutputHandlerFactory, VmRunner, + ConcurrentOutputHandlerFactory, L1BatchOutput, L2BlockOutput, OutputHandler, VmRunner, }; #[derive(Debug, Clone)] struct StorageWriterIo { + last_processed_block: L2BlockNumber, last_processed_batch: Arc>, pool: ConnectionPool, insert_protective_reads: bool, @@ -72,43 +73,43 @@ impl VmRunnerIo for StorageWriterIo { impl StorageWriterIo { async fn write_storage_logs( conn: &mut Connection<'_, Core>, - updates_manager: &UpdatesManager, + block_number: L2BlockNumber, + storage_logs: impl Iterator, ) -> anyhow::Result<()> { - let storage_logs = updates_manager - .l2_block - .storage_logs - .iter() - .filter_map(|log| log.log.is_write().then_some(log.log)); + let storage_logs = storage_logs.filter_map(|log| log.log.is_write().then_some(log.log)); let storage_logs: Vec<_> = storage_logs.collect(); conn.storage_logs_dal() - .append_storage_logs(updates_manager.l2_block.number, &storage_logs) + .append_storage_logs(block_number, &storage_logs) .await?; Ok(()) } } #[async_trait] -impl StateKeeperOutputHandler for StorageWriterIo { - async fn handle_l2_block(&mut self, updates_manager: &UpdatesManager) -> anyhow::Result<()> { +impl OutputHandler for StorageWriterIo { + async fn handle_l2_block( + &mut self, + env: L2BlockEnv, + output: &L2BlockOutput, + ) -> anyhow::Result<()> { let mut conn = self.pool.connection().await?; - Self::write_storage_logs(&mut conn, updates_manager).await?; + let storage_logs = output + .transactions + .iter() + .flat_map(|(_, exec_result)| &exec_result.tx_result.logs.storage_logs); + let block_number = L2BlockNumber(env.number); + Self::write_storage_logs(&mut conn, block_number, storage_logs).await?; + self.last_processed_block = block_number; Ok(()) } - async fn handle_l1_batch( - &mut self, - updates_manager: Arc, - ) -> anyhow::Result<()> { + async fn handle_l1_batch(self: Box, output: Arc) -> anyhow::Result<()> { let mut conn = self.pool.connection().await?; // Storage logs are added to the fictive block *after* `handle_l2_block()` is called for it, so we need to call it again here. - Self::write_storage_logs(&mut conn, &updates_manager).await?; - - let finished_batch = updates_manager - .l1_batch - .finished - .as_ref() - .expect("L1 batch is not finished"); - let state_diffs = finished_batch.state_diffs.as_ref().expect("no state diffs"); + let storage_logs = &output.batch.block_tip_execution_result.logs.storage_logs; + Self::write_storage_logs(&mut conn, self.last_processed_block, storage_logs.iter()).await?; + + let state_diffs = output.batch.state_diffs.as_ref().expect("no state diffs"); let initial_writes: Vec<_> = state_diffs .iter() .filter(|diff| diff.is_write_initial()) @@ -119,12 +120,14 @@ impl StateKeeperOutputHandler for StorageWriterIo { )) }) .collect(); + let l1_batch_number = *self.last_processed_batch.borrow() + 1; conn.storage_logs_dedup_dal() - .insert_initial_writes(updates_manager.l1_batch.number, &initial_writes) + .insert_initial_writes(l1_batch_number, &initial_writes) .await?; if self.insert_protective_reads { - let protective_reads: Vec<_> = finished_batch + let protective_reads: Vec<_> = output + .batch .final_execution_state .deduplicated_storage_logs .iter() @@ -132,12 +135,11 @@ impl StateKeeperOutputHandler for StorageWriterIo { .copied() .collect(); conn.storage_logs_dedup_dal() - .insert_protective_reads(updates_manager.l1_batch.number, &protective_reads) + .insert_protective_reads(l1_batch_number, &protective_reads) .await?; } - self.last_processed_batch - .send_replace(updates_manager.l1_batch.number); + self.last_processed_batch.send_replace(l1_batch_number); Ok(()) } } @@ -146,9 +148,10 @@ impl StateKeeperOutputHandler for StorageWriterIo { impl OutputHandlerFactory for StorageWriterIo { async fn create_handler( &mut self, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - assert_eq!(l1_batch_number, self.batch() + 1); + _system_env: SystemEnv, + l1_batch_env: L1BatchEnv, + ) -> anyhow::Result> { + assert_eq!(l1_batch_env.number, self.batch() + 1); Ok(Box::new(self.clone())) } } @@ -166,6 +169,7 @@ pub(super) async fn write_storage_logs(pool: ConnectionPool, insert_protec drop(conn); let io = Box::new(StorageWriterIo { last_processed_batch: Arc::new(watch::channel(L1BatchNumber(0)).0), + last_processed_block: L2BlockNumber(0), pool: pool.clone(), insert_protective_reads, }); @@ -175,8 +179,8 @@ pub(super) async fn write_storage_logs(pool: ConnectionPool, insert_protec .await .unwrap(); let loader = Arc::new(loader); - let batch_executor = Box::new(MainBatchExecutor::new(false, false)); - let vm_runner = VmRunner::new(pool, io.clone(), loader, io, batch_executor); + let batch_executor = MainBatchExecutorFactory::new(false, false); + let vm_runner = VmRunner::new(pool, io.clone(), loader, io, Box::new(batch_executor)); let (stop_sender, stop_receiver) = watch::channel(false); let vm_runner_handle = tokio::spawn(async move { vm_runner.run(&stop_receiver).await }); @@ -233,13 +237,13 @@ async fn storage_writer_works(insert_protective_reads: bool) { let (output_factory, output_factory_task) = ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), TestOutputFactory::default()); let output_factory_handle = tokio::spawn(output_factory_task.run(stop_receiver.clone())); - let batch_executor = Box::new(MainBatchExecutor::new(false, false)); + let batch_executor = MainBatchExecutorFactory::new(false, false); let vm_runner = VmRunner::new( pool, Box::new(io.clone()), loader, Box::new(output_factory), - batch_executor, + Box::new(batch_executor), ); let vm_runner_handle = tokio::spawn(async move { vm_runner.run(&stop_receiver).await }); diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 8fe3b6f36f67..09b13a80e397 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -8356,6 +8356,8 @@ dependencies = [ name = "zksync_vm_interface" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "hex", "serde", "thiserror",