Skip to content

Commit

Permalink
feat(vm-runner): implement VM runner main body (#1955)
Browse files Browse the repository at this point in the history
## What ❔

Main body of VM runner that combines all previously implemented
components into a reusable framework that re-executes blocks in VM.

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
itegulov authored May 23, 2024
1 parent a6232c5 commit bf5b6c2
Show file tree
Hide file tree
Showing 27 changed files with 1,172 additions and 746 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -964,3 +964,6 @@ delegator
Bbellman
Sbellman
DCMAKE
preloaded
e2e
upcasting
8 changes: 3 additions & 5 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,8 @@ async fn build_state_keeper(
stop_receiver_clone.changed().await?;
result
}));
let batch_executor_base: Box<dyn BatchExecutor> = Box::new(MainBatchExecutor::new(
Arc::new(storage_factory),
save_call_traces,
true,
));
let batch_executor_base: Box<dyn BatchExecutor> =
Box::new(MainBatchExecutor::new(save_call_traces, true));

let io = ExternalIO::new(
connection_pool,
Expand All @@ -117,6 +114,7 @@ async fn build_state_keeper(
batch_executor_base,
output_handler,
Arc::new(NoopSealer),
Arc::new(storage_factory),
))
}

Expand Down
3 changes: 2 additions & 1 deletion core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zksync_node_test_utils::{create_l1_batch_metadata, create_l2_transaction};
use zksync_state_keeper::{
io::{IoCursor, L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::MockBatchExecutor,
testonly::{test_batch_executor::MockReadStorageFactory, MockBatchExecutor},
OutputHandler, StateKeeperPersistence, ZkSyncStateKeeper,
};
use zksync_types::{Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId};
Expand Down Expand Up @@ -344,6 +344,7 @@ impl StateKeeperRunner {
OutputHandler::new(Box::new(persistence.with_tx_insertion()))
.with_handler(Box::new(self.sync_state.clone())),
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
)
.run()
.await
Expand Down
4 changes: 2 additions & 2 deletions core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ impl MainNodeBuilder {
wallets.state_keeper.context("State keeper wallets")?,
);
let main_node_batch_executor_builder_layer =
MainBatchExecutorLayer::new(DBConfig::from_env()?, StateKeeperConfig::from_env()?);
let state_keeper_layer = StateKeeperLayer;
MainBatchExecutorLayer::new(StateKeeperConfig::from_env()?);
let state_keeper_layer = StateKeeperLayer::new(DBConfig::from_env()?);
self.node
.add_layer(mempool_io_layer)
.add_layer(main_node_batch_executor_builder_layer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,21 @@
use std::sync::Arc;

use zksync_config::{configs::chain::StateKeeperConfig, DBConfig};
use zksync_state::{AsyncCatchupTask, RocksdbStorageOptions};
use zksync_state_keeper::{AsyncRocksdbCache, MainBatchExecutor};
use zksync_config::configs::chain::StateKeeperConfig;
use zksync_state_keeper::MainBatchExecutor;

use crate::{
implementations::resources::{
pools::{MasterPool, PoolResource},
state_keeper::BatchExecutorResource,
},
implementations::resources::state_keeper::BatchExecutorResource,
resource::Unique,
service::{ServiceContext, StopReceiver},
task::Task,
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
};

#[derive(Debug)]
pub struct MainBatchExecutorLayer {
db_config: DBConfig,
state_keeper_config: StateKeeperConfig,
}

impl MainBatchExecutorLayer {
pub fn new(db_config: DBConfig, state_keeper_config: StateKeeperConfig) -> Self {
pub fn new(state_keeper_config: StateKeeperConfig) -> Self {
Self {
db_config,
state_keeper_config,
}
}
Expand All @@ -37,44 +28,9 @@ impl WiringLayer for MainBatchExecutorLayer {
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let master_pool = context.get_resource::<PoolResource<MasterPool>>().await?;

let cache_options = RocksdbStorageOptions {
block_cache_capacity: self
.db_config
.experimental
.state_keeper_db_block_cache_capacity(),
max_open_files: self.db_config.experimental.state_keeper_db_max_open_files,
};
let (storage_factory, task) = AsyncRocksdbCache::new(
master_pool.get_singleton().await?,
self.db_config.state_keeper_db_path,
cache_options,
);
let builder = MainBatchExecutor::new(
Arc::new(storage_factory),
self.state_keeper_config.save_call_traces,
false,
);
let builder = MainBatchExecutor::new(self.state_keeper_config.save_call_traces, false);

context.insert_resource(BatchExecutorResource(Unique::new(Box::new(builder))))?;
context.add_task(Box::new(RocksdbCatchupTask(task)));
Ok(())
}
}

#[derive(Debug)]
struct RocksdbCatchupTask(AsyncCatchupTask);

#[async_trait::async_trait]
impl Task for RocksdbCatchupTask {
fn name(&self) -> &'static str {
"state_keeper/rocksdb_catchup_task"
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.0.run(stop_receiver.0.clone()).await?;
stop_receiver.0.changed().await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use std::sync::Arc;

use anyhow::Context;
use zksync_config::DBConfig;
use zksync_state::{AsyncCatchupTask, ReadStorageFactory, RocksdbStorageOptions};
use zksync_state_keeper::{
seal_criteria::ConditionalSealer, BatchExecutor, OutputHandler, StateKeeperIO,
ZkSyncStateKeeper,
seal_criteria::ConditionalSealer, AsyncRocksdbCache, BatchExecutor, OutputHandler,
StateKeeperIO, ZkSyncStateKeeper,
};
use zksync_storage::RocksDB;

pub mod main_batch_executor;
pub mod mempool_io;

use crate::{
implementations::resources::state_keeper::{
BatchExecutorResource, ConditionalSealerResource, OutputHandlerResource,
StateKeeperIOResource,
implementations::resources::{
pools::{MasterPool, PoolResource},
state_keeper::{
BatchExecutorResource, ConditionalSealerResource, OutputHandlerResource,
StateKeeperIOResource,
},
},
service::{ServiceContext, StopReceiver},
task::Task,
Expand All @@ -26,7 +31,15 @@ use crate::{
/// - `ConditionalSealerResource`
///
#[derive(Debug)]
pub struct StateKeeperLayer;
pub struct StateKeeperLayer {
db_config: DBConfig,
}

impl StateKeeperLayer {
pub fn new(db_config: DBConfig) -> Self {
Self { db_config }
}
}

#[async_trait::async_trait]
impl WiringLayer for StateKeeperLayer {
Expand Down Expand Up @@ -54,12 +67,28 @@ impl WiringLayer for StateKeeperLayer {
.take()
.context("HandleStateKeeperOutput was provided but taken by another task")?;
let sealer = context.get_resource::<ConditionalSealerResource>().await?.0;
let master_pool = context.get_resource::<PoolResource<MasterPool>>().await?;

let cache_options = RocksdbStorageOptions {
block_cache_capacity: self
.db_config
.experimental
.state_keeper_db_block_cache_capacity(),
max_open_files: self.db_config.experimental.state_keeper_db_max_open_files,
};
let (storage_factory, task) = AsyncRocksdbCache::new(
master_pool.get_singleton().await?,
self.db_config.state_keeper_db_path,
cache_options,
);
context.add_task(Box::new(RocksdbCatchupTask(task)));

context.add_task(Box::new(StateKeeperTask {
io,
batch_executor_base,
output_handler,
sealer,
storage_factory: Arc::new(storage_factory),
}));
Ok(())
}
Expand All @@ -71,6 +100,7 @@ struct StateKeeperTask {
batch_executor_base: Box<dyn BatchExecutor>,
output_handler: OutputHandler,
sealer: Arc<dyn ConditionalSealer>,
storage_factory: Arc<dyn ReadStorageFactory>,
}

#[async_trait::async_trait]
Expand All @@ -86,6 +116,7 @@ impl Task for StateKeeperTask {
self.batch_executor_base,
self.output_handler,
self.sealer,
self.storage_factory,
);
let result = state_keeper.run().await;

Expand All @@ -97,3 +128,19 @@ impl Task for StateKeeperTask {
result
}
}

#[derive(Debug)]
struct RocksdbCatchupTask(AsyncCatchupTask);

#[async_trait::async_trait]
impl Task for RocksdbCatchupTask {
fn name(&self) -> &'static str {
"state_keeper/rocksdb_catchup_task"
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.0.run(stop_receiver.0.clone()).await?;
stop_receiver.0.changed().await?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_node_test_utils::{
use zksync_state_keeper::{
io::{L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::test_batch_executor::TestBatchExecutorBuilder,
testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder},
OutputHandler, StateKeeperPersistence, ZkSyncStateKeeper,
};
use zksync_types::{
Expand Down Expand Up @@ -130,6 +130,7 @@ impl StateKeeperHandles {
Box::new(batch_executor_base),
output_handler,
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
);

Self {
Expand Down
10 changes: 2 additions & 8 deletions core/node/state_keeper/src/batch_executor/main_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,13 @@ use crate::{
/// Creates a "real" batch executor which maintains the VM (as opposed to the test builder which doesn't use the VM).
#[derive(Debug, Clone)]
pub struct MainBatchExecutor {
storage_factory: Arc<dyn ReadStorageFactory>,
save_call_traces: bool,
optional_bytecode_compression: bool,
}

impl MainBatchExecutor {
pub fn new(
storage_factory: Arc<dyn ReadStorageFactory>,
save_call_traces: bool,
optional_bytecode_compression: bool,
) -> Self {
pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self {
Self {
storage_factory,
save_call_traces,
optional_bytecode_compression,
}
Expand All @@ -53,6 +47,7 @@ impl MainBatchExecutor {
impl BatchExecutor for MainBatchExecutor {
async fn init_batch(
&mut self,
storage_factory: Arc<dyn ReadStorageFactory>,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
Expand All @@ -66,7 +61,6 @@ impl BatchExecutor for MainBatchExecutor {
commands: commands_receiver,
};

let storage_factory = self.storage_factory.clone();
let stop_receiver = stop_receiver.clone();
let handle = tokio::task::spawn_blocking(move || {
if let Some(storage) = Handle::current()
Expand Down
14 changes: 8 additions & 6 deletions core/node/state_keeper/src/batch_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, sync::Arc};

use async_trait::async_trait;
use multivm::interface::{
Expand All @@ -8,6 +8,7 @@ use tokio::{
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
use zksync_state::ReadStorageFactory;
use zksync_types::{vm_trace::Call, Transaction};
use zksync_utils::bytecode::CompressedBytecodeInfo;

Expand All @@ -23,7 +24,7 @@ pub mod main_executor;

/// Representation of a transaction executed in the virtual machine.
#[derive(Debug, Clone)]
pub(crate) enum TxExecutionResult {
pub enum TxExecutionResult {
/// Successful execution of the tx and the block tip dry run.
Success {
tx_result: Box<VmExecutionResultAndLogs>,
Expand Down Expand Up @@ -58,6 +59,7 @@ impl TxExecutionResult {
pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug {
async fn init_batch(
&mut self,
storage_factory: Arc<dyn ReadStorageFactory>,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
Expand All @@ -81,7 +83,7 @@ impl BatchExecutorHandle {
Self { handle, commands }
}

pub(super) async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult {
pub async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult {
let tx_gas_limit = tx.gas_limit().as_u64();

let (response_sender, response_receiver) = oneshot::channel();
Expand Down Expand Up @@ -113,7 +115,7 @@ impl BatchExecutorHandle {
res
}

pub(super) async fn start_next_l2_block(&self, env: L2BlockEnv) {
pub async fn start_next_l2_block(&self, env: L2BlockEnv) {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
Expand All @@ -128,7 +130,7 @@ impl BatchExecutorHandle {
latency.observe();
}

pub(super) async fn rollback_last_tx(&self) {
pub async fn rollback_last_tx(&self) {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
Expand All @@ -143,7 +145,7 @@ impl BatchExecutorHandle {
latency.observe();
}

pub(super) async fn finish_batch(self) -> FinishedL1Batch {
pub async fn finish_batch(self) -> FinishedL1Batch {
let (response_sender, response_receiver) = oneshot::channel();
self.commands
.send(Command::FinishBatch(response_sender))
Expand Down
5 changes: 2 additions & 3 deletions core/node/state_keeper/src/batch_executor/tests/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,10 @@ impl Tester {
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
) -> BatchExecutorHandle {
let mut batch_executor =
MainBatchExecutor::new(storage_factory, self.config.save_call_traces, false);
let mut batch_executor = MainBatchExecutor::new(self.config.save_call_traces, false);
let (_stop_sender, stop_receiver) = watch::channel(false);
batch_executor
.init_batch(l1_batch_env, system_env, &stop_receiver)
.init_batch(storage_factory, l1_batch_env, system_env, &stop_receiver)
.await
.expect("Batch executor was interrupted")
}
Expand Down
Loading

0 comments on commit bf5b6c2

Please sign in to comment.