Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vm-runner): implement VM runner main body #1955

Merged
merged 24 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -961,3 +961,6 @@ vec
zksync_merkle_tree
TreeMetadata
delegator
preloaded
e2e
upcasting
8 changes: 3 additions & 5 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ async fn build_state_keeper(
stop_receiver_clone.changed().await?;
result
}));
let batch_executor_base: Box<dyn BatchExecutor> = Box::new(MainBatchExecutor::new(
Arc::new(storage_factory),
save_call_traces,
true,
));
let batch_executor_base: Box<dyn BatchExecutor> =
Box::new(MainBatchExecutor::new(save_call_traces, true));

let io = ExternalIO::new(
connection_pool,
Expand All @@ -118,6 +115,7 @@ async fn build_state_keeper(
batch_executor_base,
output_handler,
Arc::new(NoopSealer),
Arc::new(storage_factory),
))
}

Expand Down
3 changes: 2 additions & 1 deletion core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use zksync_node_test_utils::{create_l1_batch_metadata, create_l2_transaction};
use zksync_state_keeper::{
io::{IoCursor, L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::MockBatchExecutor,
testonly::{test_batch_executor::MockReadStorageFactory, MockBatchExecutor},
OutputHandler, StateKeeperPersistence, ZkSyncStateKeeper,
};
use zksync_types::{Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId};
Expand Down Expand Up @@ -344,6 +344,7 @@ impl StateKeeperRunner {
OutputHandler::new(Box::new(persistence.with_tx_insertion()))
.with_handler(Box::new(self.sync_state.clone())),
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
)
.run()
.await
Expand Down
4 changes: 2 additions & 2 deletions core/node/node_framework/examples/main_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ impl MainNodeBuilder {
wallets.state_keeper.context("State keeper wallets")?,
);
let main_node_batch_executor_builder_layer =
MainBatchExecutorLayer::new(DBConfig::from_env()?, StateKeeperConfig::from_env()?);
let state_keeper_layer = StateKeeperLayer;
MainBatchExecutorLayer::new(StateKeeperConfig::from_env()?);
let state_keeper_layer = StateKeeperLayer::new(DBConfig::from_env()?);
self.node
.add_layer(mempool_io_layer)
.add_layer(main_node_batch_executor_builder_layer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,21 @@
use std::sync::Arc;

use zksync_config::{configs::chain::StateKeeperConfig, DBConfig};
use zksync_state::{AsyncCatchupTask, RocksdbStorageOptions};
use zksync_state_keeper::{AsyncRocksdbCache, MainBatchExecutor};
use zksync_config::configs::chain::StateKeeperConfig;
use zksync_state_keeper::MainBatchExecutor;

use crate::{
implementations::resources::{
pools::{MasterPool, PoolResource},
state_keeper::BatchExecutorResource,
},
implementations::resources::state_keeper::BatchExecutorResource,
resource::Unique,
service::{ServiceContext, StopReceiver},
task::Task,
service::ServiceContext,
wiring_layer::{WiringError, WiringLayer},
};

#[derive(Debug)]
pub struct MainBatchExecutorLayer {
db_config: DBConfig,
state_keeper_config: StateKeeperConfig,
}

impl MainBatchExecutorLayer {
pub fn new(db_config: DBConfig, state_keeper_config: StateKeeperConfig) -> Self {
pub fn new(state_keeper_config: StateKeeperConfig) -> Self {
Self {
db_config,
state_keeper_config,
}
}
Expand All @@ -37,44 +28,9 @@ impl WiringLayer for MainBatchExecutorLayer {
}

async fn wire(self: Box<Self>, mut context: ServiceContext<'_>) -> Result<(), WiringError> {
let master_pool = context.get_resource::<PoolResource<MasterPool>>().await?;

let cache_options = RocksdbStorageOptions {
block_cache_capacity: self
.db_config
.experimental
.state_keeper_db_block_cache_capacity(),
max_open_files: self.db_config.experimental.state_keeper_db_max_open_files,
};
let (storage_factory, task) = AsyncRocksdbCache::new(
master_pool.get_singleton().await?,
self.db_config.state_keeper_db_path,
cache_options,
);
let builder = MainBatchExecutor::new(
Arc::new(storage_factory),
self.state_keeper_config.save_call_traces,
false,
);
let builder = MainBatchExecutor::new(self.state_keeper_config.save_call_traces, false);

context.insert_resource(BatchExecutorResource(Unique::new(Box::new(builder))))?;
context.add_task(Box::new(RocksdbCatchupTask(task)));
Ok(())
}
}

#[derive(Debug)]
struct RocksdbCatchupTask(AsyncCatchupTask);

#[async_trait::async_trait]
impl Task for RocksdbCatchupTask {
fn name(&self) -> &'static str {
"state_keeper/rocksdb_catchup_task"
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.0.run(stop_receiver.0.clone()).await?;
stop_receiver.0.changed().await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
use std::sync::Arc;

use anyhow::Context;
use zksync_config::DBConfig;
use zksync_state::{AsyncCatchupTask, ReadStorageFactory, RocksdbStorageOptions};
use zksync_state_keeper::{
seal_criteria::ConditionalSealer, BatchExecutor, OutputHandler, StateKeeperIO,
ZkSyncStateKeeper,
seal_criteria::ConditionalSealer, AsyncRocksdbCache, BatchExecutor, OutputHandler,
StateKeeperIO, ZkSyncStateKeeper,
};
use zksync_storage::RocksDB;

pub mod main_batch_executor;
pub mod mempool_io;

use crate::{
implementations::resources::state_keeper::{
BatchExecutorResource, ConditionalSealerResource, OutputHandlerResource,
StateKeeperIOResource,
implementations::resources::{
pools::{MasterPool, PoolResource},
state_keeper::{
BatchExecutorResource, ConditionalSealerResource, OutputHandlerResource,
StateKeeperIOResource,
},
},
service::{ServiceContext, StopReceiver},
task::Task,
Expand All @@ -26,7 +31,15 @@ use crate::{
/// - `ConditionalSealerResource`
///
#[derive(Debug)]
pub struct StateKeeperLayer;
pub struct StateKeeperLayer {
db_config: DBConfig,
}

impl StateKeeperLayer {
pub fn new(db_config: DBConfig) -> Self {
Self { db_config }
}
}

#[async_trait::async_trait]
impl WiringLayer for StateKeeperLayer {
Expand Down Expand Up @@ -54,12 +67,28 @@ impl WiringLayer for StateKeeperLayer {
.take()
.context("HandleStateKeeperOutput was provided but taken by another task")?;
let sealer = context.get_resource::<ConditionalSealerResource>().await?.0;
let master_pool = context.get_resource::<PoolResource<MasterPool>>().await?;

let cache_options = RocksdbStorageOptions {
block_cache_capacity: self
.db_config
.experimental
.state_keeper_db_block_cache_capacity(),
max_open_files: self.db_config.experimental.state_keeper_db_max_open_files,
};
let (storage_factory, task) = AsyncRocksdbCache::new(
master_pool.get_singleton().await?,
self.db_config.state_keeper_db_path,
cache_options,
);
context.add_task(Box::new(RocksdbCatchupTask(task)));

context.add_task(Box::new(StateKeeperTask {
io,
batch_executor_base,
output_handler,
sealer,
storage_factory: Arc::new(storage_factory),
}));
Ok(())
}
Expand All @@ -71,6 +100,7 @@ struct StateKeeperTask {
batch_executor_base: Box<dyn BatchExecutor>,
output_handler: OutputHandler,
sealer: Arc<dyn ConditionalSealer>,
storage_factory: Arc<dyn ReadStorageFactory>,
}

#[async_trait::async_trait]
Expand All @@ -86,6 +116,7 @@ impl Task for StateKeeperTask {
self.batch_executor_base,
self.output_handler,
self.sealer,
self.storage_factory,
);
let result = state_keeper.run().await;

Expand All @@ -97,3 +128,19 @@ impl Task for StateKeeperTask {
result
}
}

#[derive(Debug)]
struct RocksdbCatchupTask(AsyncCatchupTask);

#[async_trait::async_trait]
impl Task for RocksdbCatchupTask {
fn name(&self) -> &'static str {
"state_keeper/rocksdb_catchup_task"
}

async fn run(self: Box<Self>, mut stop_receiver: StopReceiver) -> anyhow::Result<()> {
self.0.run(stop_receiver.0.clone()).await?;
stop_receiver.0.changed().await?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use zksync_node_test_utils::{
use zksync_state_keeper::{
io::{L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::test_batch_executor::TestBatchExecutorBuilder,
testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder},
OutputHandler, StateKeeperPersistence, ZkSyncStateKeeper,
};
use zksync_types::{
Expand Down Expand Up @@ -130,6 +130,7 @@ impl StateKeeperHandles {
Box::new(batch_executor_base),
output_handler,
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
);

Self {
Expand Down
10 changes: 2 additions & 8 deletions core/node/state_keeper/src/batch_executor/main_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,13 @@ use crate::{
/// Creates a "real" batch executor which maintains the VM (as opposed to the test builder which doesn't use the VM).
#[derive(Debug, Clone)]
pub struct MainBatchExecutor {
storage_factory: Arc<dyn ReadStorageFactory>,
save_call_traces: bool,
optional_bytecode_compression: bool,
}

impl MainBatchExecutor {
pub fn new(
storage_factory: Arc<dyn ReadStorageFactory>,
save_call_traces: bool,
optional_bytecode_compression: bool,
) -> Self {
pub fn new(save_call_traces: bool, optional_bytecode_compression: bool) -> Self {
Self {
storage_factory,
save_call_traces,
optional_bytecode_compression,
}
Expand All @@ -53,6 +47,7 @@ impl MainBatchExecutor {
impl BatchExecutor for MainBatchExecutor {
async fn init_batch(
&mut self,
storage_factory: Arc<dyn ReadStorageFactory>,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
Expand All @@ -66,7 +61,6 @@ impl BatchExecutor for MainBatchExecutor {
commands: commands_receiver,
};

let storage_factory = self.storage_factory.clone();
let stop_receiver = stop_receiver.clone();
let handle = tokio::task::spawn_blocking(move || {
if let Some(storage) = Handle::current()
Expand Down
14 changes: 8 additions & 6 deletions core/node/state_keeper/src/batch_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{fmt, sync::Arc};

use async_trait::async_trait;
use multivm::interface::{
Expand All @@ -8,6 +8,7 @@ use tokio::{
sync::{mpsc, oneshot, watch},
task::JoinHandle,
};
use zksync_state::ReadStorageFactory;
use zksync_types::{vm_trace::Call, Transaction};
use zksync_utils::bytecode::CompressedBytecodeInfo;

Expand All @@ -23,7 +24,7 @@ pub mod main_executor;

/// Representation of a transaction executed in the virtual machine.
#[derive(Debug, Clone)]
pub(crate) enum TxExecutionResult {
pub enum TxExecutionResult {
/// Successful execution of the tx and the block tip dry run.
Success {
tx_result: Box<VmExecutionResultAndLogs>,
Expand Down Expand Up @@ -58,6 +59,7 @@ impl TxExecutionResult {
pub trait BatchExecutor: 'static + Send + Sync + fmt::Debug {
async fn init_batch(
&mut self,
storage_factory: Arc<dyn ReadStorageFactory>,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
stop_receiver: &watch::Receiver<bool>,
Expand All @@ -81,7 +83,7 @@ impl BatchExecutorHandle {
Self { handle, commands }
}

pub(super) async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult {
pub async fn execute_tx(&self, tx: Transaction) -> TxExecutionResult {
let tx_gas_limit = tx.gas_limit().as_u64();

let (response_sender, response_receiver) = oneshot::channel();
Expand Down Expand Up @@ -113,7 +115,7 @@ impl BatchExecutorHandle {
res
}

pub(super) async fn start_next_l2_block(&self, env: L2BlockEnv) {
pub async fn start_next_l2_block(&self, env: L2BlockEnv) {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
Expand All @@ -128,7 +130,7 @@ impl BatchExecutorHandle {
latency.observe();
}

pub(super) async fn rollback_last_tx(&self) {
pub async fn rollback_last_tx(&self) {
// While we don't get anything from the channel, it's useful to have it as a confirmation that the operation
// indeed has been processed.
let (response_sender, response_receiver) = oneshot::channel();
Expand All @@ -143,7 +145,7 @@ impl BatchExecutorHandle {
latency.observe();
}

pub(super) async fn finish_batch(self) -> FinishedL1Batch {
pub async fn finish_batch(self) -> FinishedL1Batch {
let (response_sender, response_receiver) = oneshot::channel();
self.commands
.send(Command::FinishBatch(response_sender))
Expand Down
5 changes: 2 additions & 3 deletions core/node/state_keeper/src/batch_executor/tests/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,10 @@ impl Tester {
l1_batch_env: L1BatchEnv,
system_env: SystemEnv,
) -> BatchExecutorHandle {
let mut batch_executor =
MainBatchExecutor::new(storage_factory, self.config.save_call_traces, false);
let mut batch_executor = MainBatchExecutor::new(self.config.save_call_traces, false);
let (_stop_sender, stop_receiver) = watch::channel(false);
batch_executor
.init_batch(l1_batch_env, system_env, &stop_receiver)
.init_batch(storage_factory, l1_batch_env, system_env, &stop_receiver)
.await
.expect("Batch executor was interrupted")
}
Expand Down
Loading
Loading