diff --git a/core/lib/config/src/configs/experimental.rs b/core/lib/config/src/configs/experimental.rs index bb00554ead1c..8309b36e7f22 100644 --- a/core/lib/config/src/configs/experimental.rs +++ b/core/lib/config/src/configs/experimental.rs @@ -74,6 +74,9 @@ pub struct ExperimentalVmPlaygroundConfig { /// First L1 batch to consider processed. Will not be used if the processing cursor is persisted, unless the `reset` flag is set. #[serde(default)] pub first_processed_batch: L1BatchNumber, + /// Maximum number of L1 batches to process in parallel. + #[serde(default = "ExperimentalVmPlaygroundConfig::default_window_size")] + pub window_size: NonZeroU32, /// If set to true, processing cursor will reset `first_processed_batch` regardless of the current progress. Beware that this will likely /// require to drop the RocksDB cache. #[serde(default)] @@ -86,6 +89,7 @@ impl Default for ExperimentalVmPlaygroundConfig { fast_vm_mode: FastVmMode::default(), db_path: Self::default_db_path(), first_processed_batch: L1BatchNumber(0), + window_size: Self::default_window_size(), reset: false, } } @@ -95,6 +99,10 @@ impl ExperimentalVmPlaygroundConfig { pub fn default_db_path() -> String { "./db/vm_playground".to_owned() } + + pub fn default_window_size() -> NonZeroU32 { + NonZeroU32::new(1).unwrap() + } } /// Experimental VM configuration options. diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 36ed650bdef0..71d02e3bec0b 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -300,6 +300,7 @@ impl Distribution for EncodeDist { fast_vm_mode: gen_fast_vm_mode(rng), db_path: self.sample(rng), first_processed_batch: L1BatchNumber(rng.gen()), + window_size: rng.gen(), reset: self.sample(rng), } } diff --git a/core/lib/protobuf_config/src/experimental.rs b/core/lib/protobuf_config/src/experimental.rs index cb959e229047..7b71dec80344 100644 --- a/core/lib/protobuf_config/src/experimental.rs +++ b/core/lib/protobuf_config/src/experimental.rs @@ -85,6 +85,8 @@ impl ProtoRepr for proto::VmPlayground { .clone() .unwrap_or_else(Self::Type::default_db_path), first_processed_batch: L1BatchNumber(self.first_processed_batch.unwrap_or(0)), + window_size: NonZeroU32::new(self.window_size.unwrap_or(1)) + .context("window_size cannot be 0")?, reset: self.reset.unwrap_or(false), }) } @@ -94,6 +96,7 @@ impl ProtoRepr for proto::VmPlayground { fast_vm_mode: Some(proto::FastVmMode::new(this.fast_vm_mode).into()), db_path: Some(this.db_path.clone()), first_processed_batch: Some(this.first_processed_batch.0), + window_size: Some(this.window_size.get()), reset: Some(this.reset), } } diff --git a/core/lib/protobuf_config/src/proto/config/experimental.proto b/core/lib/protobuf_config/src/proto/config/experimental.proto index 1682b2c9a834..55fb81b56325 100644 --- a/core/lib/protobuf_config/src/proto/config/experimental.proto +++ b/core/lib/protobuf_config/src/proto/config/experimental.proto @@ -31,6 +31,7 @@ message VmPlayground { optional string db_path = 2; // optional; defaults to `./db/vm_playground` optional uint32 first_processed_batch = 3; // optional; defaults to 0 optional bool reset = 4; // optional; defaults to false + optional uint32 window_size = 5; // optional; non-zero; defaults to 1 } message Vm { diff --git a/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs b/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs index 810d538ba978..eedde16074f5 100644 --- a/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs +++ b/core/node/node_framework/src/implementations/layers/vm_runner/playground.rs @@ -3,14 +3,14 @@ use zksync_config::configs::ExperimentalVmPlaygroundConfig; use zksync_node_framework_derive::{FromContext, IntoContext}; use zksync_types::L2ChainId; use zksync_vm_runner::{ - impls::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask}, + impls::{VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask}, ConcurrentOutputHandlerFactoryTask, }; use crate::{ implementations::resources::{ healthcheck::AppHealthCheckResource, - pools::{MasterPool, PoolResource}, + pools::{PoolResource, ReplicaPool}, }, StopReceiver, Task, TaskId, WiringError, WiringLayer, }; @@ -33,7 +33,8 @@ impl VmPlaygroundLayer { #[derive(Debug, FromContext)] #[context(crate = crate)] pub struct Input { - pub master_pool: PoolResource, + // We use a replica pool because VM playground doesn't write anything to the DB by design. + pub replica_pool: PoolResource, #[context(default)] pub app_health: AppHealthCheckResource, } @@ -60,7 +61,7 @@ impl WiringLayer for VmPlaygroundLayer { async fn wire(self, input: Self::Input) -> Result { let Input { - master_pool, + replica_pool, app_health, } = input; @@ -68,16 +69,22 @@ impl WiringLayer for VmPlaygroundLayer { // catch up cache. // - 1 connection for `ConcurrentOutputHandlerFactoryTask` / `VmRunner` as they need occasional access // to DB for querying last processed batch and last ready to be loaded batch. - // - 1 connection for the only running VM instance. - let connection_pool = master_pool.get_custom(3).await?; - + // - `window_size` connections for running VM instances. + let connection_pool = replica_pool + .get_custom(2 + self.config.window_size.get()) + .await?; + + let cursor = VmPlaygroundCursorOptions { + first_processed_batch: self.config.first_processed_batch, + window_size: self.config.window_size, + reset_state: self.config.reset, + }; let (playground, tasks) = VmPlayground::new( connection_pool, self.config.fast_vm_mode, self.config.db_path, self.zksync_network_id, - self.config.first_processed_batch, - self.config.reset, + cursor, ) .await?; diff --git a/core/node/vm_runner/src/impls/mod.rs b/core/node/vm_runner/src/impls/mod.rs index 7f9869531c65..0911aec0561d 100644 --- a/core/node/vm_runner/src/impls/mod.rs +++ b/core/node/vm_runner/src/impls/mod.rs @@ -8,6 +8,9 @@ pub use self::{ bwip::{ BasicWitnessInputProducer, BasicWitnessInputProducerIo, BasicWitnessInputProducerTasks, }, - playground::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask, VmPlaygroundTasks}, + playground::{ + VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask, + VmPlaygroundTasks, + }, protective_reads::{ProtectiveReadsIo, ProtectiveReadsWriter, ProtectiveReadsWriterTasks}, }; diff --git a/core/node/vm_runner/src/impls/playground.rs b/core/node/vm_runner/src/impls/playground.rs index 4fb140431df6..ad5623a1329d 100644 --- a/core/node/vm_runner/src/impls/playground.rs +++ b/core/node/vm_runner/src/impls/playground.rs @@ -1,5 +1,6 @@ use std::{ io, + num::NonZeroU32, path::{Path, PathBuf}, sync::Arc, }; @@ -34,6 +35,17 @@ impl From for Health { } } +/// Options related to the VM playground cursor. +#[derive(Debug)] +pub struct VmPlaygroundCursorOptions { + /// First batch to be processed by the playground. Only used if there are no processed batches, or if [`Self.reset_state`] is set. + pub first_processed_batch: L1BatchNumber, + /// Maximum number of L1 batches to process in parallel. + pub window_size: NonZeroU32, + /// If set, reset processing to [`Self.first_processed_batch`]. + pub reset_state: bool, +} + /// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory /// (so that the playground doesn't repeatedly process same batches after a restart). #[derive(Debug)] @@ -56,21 +68,17 @@ impl VmPlayground { vm_mode: FastVmMode, rocksdb_path: String, chain_id: L2ChainId, - first_processed_batch: L1BatchNumber, - reset_state: bool, + cursor: VmPlaygroundCursorOptions, ) -> anyhow::Result<(Self, VmPlaygroundTasks)> { - tracing::info!( - "Starting VM playground with mode {vm_mode:?}, first processed batch is #{first_processed_batch} \ - (reset processing: {reset_state:?})" - ); + tracing::info!("Starting VM playground with mode {vm_mode:?}, cursor options: {cursor:?}"); let cursor_file_path = Path::new(&rocksdb_path).join("__vm_playground_cursor"); let latest_processed_batch = VmPlaygroundIo::read_cursor(&cursor_file_path).await?; tracing::info!("Latest processed batch: {latest_processed_batch:?}"); - let latest_processed_batch = if reset_state { - first_processed_batch + let latest_processed_batch = if cursor.reset_state { + cursor.first_processed_batch } else { - latest_processed_batch.unwrap_or(first_processed_batch) + latest_processed_batch.unwrap_or(cursor.first_processed_batch) }; let mut batch_executor = MainBatchExecutor::new(false, false); @@ -79,6 +87,7 @@ impl VmPlayground { let io = VmPlaygroundIo { cursor_file_path, vm_mode, + window_size: cursor.window_size.get(), latest_processed_batch: Arc::new(watch::channel(latest_processed_batch).0), health_updater: Arc::new(ReactiveHealthCheck::new("vm_playground").1), }; @@ -98,7 +107,7 @@ impl VmPlayground { io, loader_task_sender, output_handler_factory, - reset_to_batch: reset_state.then_some(first_processed_batch), + reset_to_batch: cursor.reset_state.then_some(cursor.first_processed_batch), }; Ok(( this, @@ -213,6 +222,7 @@ pub struct VmPlaygroundTasks { pub struct VmPlaygroundIo { cursor_file_path: PathBuf, vm_mode: FastVmMode, + window_size: u32, // We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes // aren't guaranteed to be atomic. latest_processed_batch: Arc>, @@ -285,7 +295,7 @@ impl VmRunnerIo for VmPlaygroundIo { .await? .context("no L1 batches in Postgres")?; let last_processed_l1_batch = self.latest_processed_batch(conn).await?; - Ok(sealed_l1_batch.min(last_processed_l1_batch + 1)) + Ok(sealed_l1_batch.min(last_processed_l1_batch + self.window_size)) } async fn mark_l1_batch_as_processing( diff --git a/core/node/vm_runner/src/tests/playground.rs b/core/node/vm_runner/src/tests/playground.rs index c4111f737418..2f3caf1f85c7 100644 --- a/core/node/vm_runner/src/tests/playground.rs +++ b/core/node/vm_runner/src/tests/playground.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroU32; + use test_casing::test_casing; use tokio::sync::watch; use zksync_health_check::HealthStatus; @@ -6,61 +8,87 @@ use zksync_state::RocksdbStorage; use zksync_types::vm::FastVmMode; use super::*; -use crate::impls::VmPlayground; +use crate::impls::{VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundTasks}; -async fn run_playground( - pool: ConnectionPool, - rocksdb_dir: &tempfile::TempDir, - reset_state: bool, -) { +async fn setup_storage(pool: &ConnectionPool, batch_count: u32) -> GenesisParams { let mut conn = pool.connection().await.unwrap(); let genesis_params = GenesisParams::mock(); - if conn.blocks_dal().is_genesis_needed().await.unwrap() { - insert_genesis_batch(&mut conn, &genesis_params) - .await - .unwrap(); - - // Generate some batches and persist them in Postgres - let mut accounts = [Account::random()]; - fund(&mut conn, &accounts).await; - store_l1_batches( - &mut conn, - 1..=1, // TODO: test on >1 batch - genesis_params.base_system_contracts().hashes(), - &mut accounts, - ) + if !conn.blocks_dal().is_genesis_needed().await.unwrap() { + return genesis_params; + } + + insert_genesis_batch(&mut conn, &genesis_params) .await .unwrap(); - } + // Generate some batches and persist them in Postgres + let mut accounts = [Account::random()]; + fund(&mut conn, &accounts).await; + store_l1_batches( + &mut conn, + 1..=batch_count, + genesis_params.base_system_contracts().hashes(), + &mut accounts, + ) + .await + .unwrap(); + + // Fill in missing storage logs for all batches so that running VM for all of them works correctly. + storage_writer::write_storage_logs(pool.clone()).await; + genesis_params +} + +async fn run_playground( + pool: ConnectionPool, + rocksdb_dir: &tempfile::TempDir, + reset_to: Option, +) { + let genesis_params = setup_storage(&pool, 5).await; + let cursor = VmPlaygroundCursorOptions { + first_processed_batch: reset_to.unwrap_or(L1BatchNumber(0)), + window_size: NonZeroU32::new(1).unwrap(), + reset_state: reset_to.is_some(), + }; let (playground, playground_tasks) = VmPlayground::new( pool.clone(), FastVmMode::Shadow, rocksdb_dir.path().to_str().unwrap().to_owned(), genesis_params.config().l2_chain_id, - L1BatchNumber(0), - reset_state, + cursor, ) .await .unwrap(); - let (stop_sender, stop_receiver) = watch::channel(false); let playground_io = playground.io().clone(); - assert_eq!( - playground_io - .latest_processed_batch(&mut conn) - .await - .unwrap(), - L1BatchNumber(0) - ); - assert_eq!( - playground_io - .last_ready_to_be_loaded_batch(&mut conn) - .await - .unwrap(), - L1BatchNumber(1) - ); + let mut conn = pool.connection().await.unwrap(); + if reset_to.is_none() { + assert_eq!( + playground_io + .latest_processed_batch(&mut conn) + .await + .unwrap(), + L1BatchNumber(0) + ); + assert_eq!( + playground_io + .last_ready_to_be_loaded_batch(&mut conn) + .await + .unwrap(), + L1BatchNumber(1) + ); + } + + wait_for_all_batches(playground, playground_tasks, &mut conn).await; +} + +async fn wait_for_all_batches( + playground: VmPlayground, + playground_tasks: VmPlaygroundTasks, + conn: &mut Connection<'_, Core>, +) { + let (stop_sender, stop_receiver) = watch::channel(false); let mut health_check = playground.health_check(); + let playground_io = playground.io().clone(); let mut completed_batches = playground_io.subscribe_to_completed_batches(); let task_handles = [ @@ -72,9 +100,17 @@ async fn run_playground( ), tokio::spawn(async move { playground.run(&stop_receiver).await }), ]; + // Wait until all batches are processed. + let last_batch_number = conn + .blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap() + .expect("No batches in storage"); + completed_batches - .wait_for(|&number| number == L1BatchNumber(1)) + .wait_for(|&number| number == last_batch_number) .await .unwrap(); health_check @@ -84,25 +120,22 @@ async fn run_playground( } let health_details = health.details().unwrap(); assert_eq!(health_details["vm_mode"], "shadow"); - health_details["last_processed_batch"] == 1_u64 + health_details["last_processed_batch"] == u64::from(last_batch_number.0) }) .await; // Check that playground I/O works correctly. assert_eq!( - playground_io - .latest_processed_batch(&mut conn) - .await - .unwrap(), - L1BatchNumber(1) + playground_io.latest_processed_batch(conn).await.unwrap(), + last_batch_number ); - // There's no batch #2 in storage + // There's no next batch assert_eq!( playground_io - .last_ready_to_be_loaded_batch(&mut conn) + .last_ready_to_be_loaded_batch(conn) .await .unwrap(), - L1BatchNumber(1) + last_batch_number ); stop_sender.send_replace(true); @@ -116,14 +149,22 @@ async fn run_playground( async fn vm_playground_basics(reset_state: bool) { let pool = ConnectionPool::test_pool().await; let rocksdb_dir = tempfile::TempDir::new().unwrap(); - run_playground(pool, &rocksdb_dir, reset_state).await; + run_playground(pool, &rocksdb_dir, reset_state.then_some(L1BatchNumber(0))).await; +} + +#[tokio::test] +async fn starting_from_non_zero_batch() { + let pool = ConnectionPool::test_pool().await; + let rocksdb_dir = tempfile::TempDir::new().unwrap(); + run_playground(pool, &rocksdb_dir, Some(L1BatchNumber(3))).await; } +#[test_casing(2, [L1BatchNumber(0), L1BatchNumber(2)])] #[tokio::test] -async fn resetting_playground_state() { +async fn resetting_playground_state(reset_to: L1BatchNumber) { let pool = ConnectionPool::test_pool().await; let rocksdb_dir = tempfile::TempDir::new().unwrap(); - run_playground(pool.clone(), &rocksdb_dir, false).await; + run_playground(pool.clone(), &rocksdb_dir, None).await; // Manually catch up RocksDB to Postgres to ensure that resetting it is not trivial. let (_stop_sender, stop_receiver) = watch::channel(false); @@ -135,5 +176,32 @@ async fn resetting_playground_state() { .await .unwrap(); - run_playground(pool.clone(), &rocksdb_dir, true).await; + run_playground(pool.clone(), &rocksdb_dir, Some(reset_to)).await; +} + +#[test_casing(2, [2, 3])] +#[tokio::test] +async fn using_larger_window_size(window_size: u32) { + assert!(window_size > 1); + let pool = ConnectionPool::test_pool().await; + let rocksdb_dir = tempfile::TempDir::new().unwrap(); + + let genesis_params = setup_storage(&pool, 5).await; + let cursor = VmPlaygroundCursorOptions { + first_processed_batch: L1BatchNumber(0), + window_size: NonZeroU32::new(window_size).unwrap(), + reset_state: false, + }; + let (playground, playground_tasks) = VmPlayground::new( + pool.clone(), + FastVmMode::Shadow, + rocksdb_dir.path().to_str().unwrap().to_owned(), + genesis_params.config().l2_chain_id, + cursor, + ) + .await + .unwrap(); + + let mut conn = pool.connection().await.unwrap(); + wait_for_all_batches(playground, playground_tasks, &mut conn).await; } diff --git a/etc/env/file_based/general.yaml b/etc/env/file_based/general.yaml index 9df7358c08cd..8e7e6eca4280 100644 --- a/etc/env/file_based/general.yaml +++ b/etc/env/file_based/general.yaml @@ -350,6 +350,8 @@ experimental_vm: playground: db_path: "./db/main/vm_playground" fast_vm_mode: SHADOW + first_processed_batch: 0 + window_size: 1 snapshot_recovery: enabled: false