Skip to content

Commit

Permalink
Simplify BatchExecutor and its factory
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Aug 30, 2024
1 parent 52b0871 commit 3731102
Show file tree
Hide file tree
Showing 21 changed files with 110 additions and 331 deletions.
4 changes: 2 additions & 2 deletions core/lib/vm_executor/src/batch/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::{
task::JoinHandle,
};
use zksync_multivm::interface::{
executor::{BatchExecutor, StandardOutputs},
executor::BatchExecutor,
storage::{ReadStorage, StorageView},
BatchTransactionExecutionResult, FinishedL1Batch, L2BlockEnv,
};
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<S: ReadStorage> MainBatchExecutor<S> {
}

#[async_trait]
impl<S> BatchExecutor<StandardOutputs<S>> for MainBatchExecutor<S>
impl<S> BatchExecutor<S> for MainBatchExecutor<S>
where
S: ReadStorage + Send + 'static,
{
Expand Down
7 changes: 2 additions & 5 deletions core/lib/vm_executor/src/batch/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use once_cell::sync::OnceCell;
use tokio::sync::mpsc;
use zksync_multivm::{
interface::{
executor::{BatchExecutorFactory, StandardOutputs},
executor::{BatchExecutor, BatchExecutorFactory},
storage::{ReadStorage, StorageView},
BatchTransactionExecutionResult, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv,
L2BlockEnv, SystemEnv, VmInterface, VmInterfaceHistoryEnabled,
Expand Down Expand Up @@ -57,15 +57,12 @@ impl MainBatchExecutorFactory {
}

impl<S: ReadStorage + Send + 'static> BatchExecutorFactory<S> for MainBatchExecutorFactory {
type Outputs = StandardOutputs<S>;
type Executor = MainBatchExecutor<S>;

fn init_batch(
&mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> Box<Self::Executor> {
) -> Box<dyn BatchExecutor<S>> {
// Since we process `BatchExecutor` commands one-by-one (the next command is never enqueued
// until a previous command is processed), capacity 1 is enough for the commands channel.
let (commands_sender, commands_receiver) = mpsc::channel(1);
Expand Down
96 changes: 8 additions & 88 deletions core/lib/vm_interface/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! High-level executor traits.
use std::{fmt, marker::PhantomData};
use std::fmt;

use async_trait::async_trait;
use zksync_types::Transaction;
Expand All @@ -10,65 +10,28 @@ use crate::{
SystemEnv,
};

/// [`BatchExecutor`] outputs.
pub trait BatchExecutorOutputs {
/// Output from processing a single transaction in a batch.
type Tx: 'static + Send;
/// Output from finalizing a batch.
type Batch: 'static + Send;
}

/// Marker type for "standard" batch executor outputs.
#[derive(Debug)]
pub struct StandardOutputs<S>(PhantomData<S>);

impl<S: Send + 'static> BatchExecutorOutputs for StandardOutputs<S> {
type Tx = BatchTransactionExecutionResult;
type Batch = (FinishedL1Batch, StorageView<S>);
}

/// Factory of [`BatchExecutor`]s.
pub trait BatchExecutorFactory<S: Send + 'static>: 'static + Send + fmt::Debug {
/// Outputs produced by executors instantiated by this factory.
type Outputs: BatchExecutorOutputs;
/// Executor instantiated by this factory.
type Executor: BatchExecutor<Self::Outputs> + ?Sized;

/// Initializes an executor for a batch with the specified params and using the provided storage.
fn init_batch(
&mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> Box<Self::Executor>;
}

impl<S, T> BatchExecutorFactory<S> for Box<T>
where
S: Send + 'static,
T: BatchExecutorFactory<S> + ?Sized,
{
type Outputs = T::Outputs;
type Executor = T::Executor;

fn init_batch(
&mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> Box<Self::Executor> {
(**self).init_batch(storage, l1_batch_params, system_env)
}
) -> Box<dyn BatchExecutor<S>>;
}

/// Handle for executing a single L1 batch.
///
/// The handle is parametric by the transaction execution output in order to be able to represent different
/// levels of abstraction.
#[async_trait]
pub trait BatchExecutor<Out: BatchExecutorOutputs>: 'static + Send + fmt::Debug {
pub trait BatchExecutor<S>: 'static + Send + fmt::Debug {
/// Executes a transaction.
async fn execute_tx(&mut self, tx: Transaction) -> anyhow::Result<Out::Tx>;
async fn execute_tx(
&mut self,
tx: Transaction,
) -> anyhow::Result<BatchTransactionExecutionResult>;

/// Rolls back the last executed transaction.
async fn rollback_last_tx(&mut self) -> anyhow::Result<()>;
Expand All @@ -77,48 +40,5 @@ pub trait BatchExecutor<Out: BatchExecutorOutputs>: 'static + Send + fmt::Debug
async fn start_next_l2_block(&mut self, env: L2BlockEnv) -> anyhow::Result<()>;

/// Finished the current L1 batch.
async fn finish_batch(self: Box<Self>) -> anyhow::Result<Out::Batch>;
}

/// Boxed [`BatchExecutorFactory`]. Can be constructed from any executor using [`box_batch_executor_factory()`].
pub type BoxBatchExecutorFactory<S, O = StandardOutputs<S>> =
Box<dyn BatchExecutorFactory<S, Outputs = O, Executor = dyn BatchExecutor<O>>>;

/// Trait object for [`BatchExecutor`] with [`StandardOutputs`].
pub type DynBatchExecutor<S> = dyn BatchExecutor<StandardOutputs<S>>;

/// Wrapper for a [`BatchExecutorFactory`] erasing returned executors.
struct ErasedFactory<T>(T);

impl<T: fmt::Debug> fmt::Debug for ErasedFactory<T> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.0, formatter)
}
}

impl<S, T> BatchExecutorFactory<S> for ErasedFactory<T>
where
S: Send + 'static,
T: BatchExecutorFactory<S, Executor: Sized>,
{
type Outputs = T::Outputs;
type Executor = dyn BatchExecutor<T::Outputs>;

fn init_batch(
&mut self,
storage: S,
l1_batch_params: L1BatchEnv,
system_env: SystemEnv,
) -> Box<Self::Executor> {
self.0.init_batch(storage, l1_batch_params, system_env)
}
}

/// Boxes the provided executor factory so that it doesn't have an ambiguous associated type.
pub fn box_batch_executor_factory<S, T>(executor: T) -> BoxBatchExecutorFactory<S, T::Outputs>
where
S: Send + 'static,
T: BatchExecutorFactory<S, Executor: Sized>,
{
Box::new(ErasedFactory(executor))
async fn finish_batch(self: Box<Self>) -> anyhow::Result<(FinishedL1Batch, StorageView<S>)>;
}
11 changes: 7 additions & 4 deletions core/node/consensus/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ use zksync_node_sync::{
};
use zksync_node_test_utils::{create_l1_batch_metadata, l1_batch_metadata_to_commitment_artifacts};
use zksync_state_keeper::{
executor::{MainBatchExecutorFactory, MainStateKeeperExecutorFactory},
executor::MainBatchExecutorFactory,
io::{IoCursor, L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::{fund, l1_transaction, l2_transaction, MockBatchExecutor},
testonly::{
fund, l1_transaction, l2_transaction, test_batch_executor::MockReadStorageFactory,
MockBatchExecutor,
},
AsyncRocksdbCache, OutputHandler, StateKeeperPersistence, TreeWritesPersistence,
ZkSyncStateKeeper,
};
Expand Down Expand Up @@ -591,8 +594,6 @@ impl StateKeeperRunner {

s.spawn_bg({
let executor_factory = MainBatchExecutorFactory::new(false, false);
let executor_factory =
MainStateKeeperExecutorFactory::new(executor_factory, Arc::new(async_cache));
let stop_recv = stop_recv.clone();
async {
ZkSyncStateKeeper::new(
Expand All @@ -602,6 +603,7 @@ impl StateKeeperRunner {
OutputHandler::new(Box::new(persistence.with_tx_insertion()))
.with_handler(Box::new(self.sync_state.clone())),
Arc::new(NoopSealer),
Arc::new(async_cache),
)
.run()
.await
Expand Down Expand Up @@ -683,6 +685,7 @@ impl StateKeeperRunner {
.with_handler(Box::new(tree_writes_persistence))
.with_handler(Box::new(self.sync_state.clone())),
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
)
.run()
.await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
use std::sync::Arc;

use anyhow::Context;
pub use zksync_state::RocksdbStorageOptions;
use zksync_state::{AsyncCatchupTask, OwnedStorage, ReadStorageFactory};
use zksync_state_keeper::{
executor::MainStateKeeperExecutorFactory, seal_criteria::ConditionalSealer, AsyncRocksdbCache,
OutputHandler, StateKeeperIO, ZkSyncStateKeeper,
seal_criteria::ConditionalSealer, AsyncRocksdbCache, OutputHandler, StateKeeperIO,
ZkSyncStateKeeper,
};
use zksync_storage::RocksDB;

pub mod external_io;
pub mod main_batch_executor;
pub mod mempool_io;
pub mod output_handler;

// Public re-export to not require the user to directly depend on `zksync_state`.
pub use zksync_state::RocksdbStorageOptions;
use zksync_vm_executor::interface::BoxBatchExecutorFactory;
use zksync_vm_executor::interface::BatchExecutorFactory;

use crate::{
implementations::resources::{
Expand All @@ -31,6 +24,11 @@ use crate::{
FromContext, IntoContext,
};

pub mod external_io;
pub mod main_batch_executor;
pub mod mempool_io;
pub mod output_handler;

/// Wiring layer for the state keeper.
#[derive(Debug)]
pub struct StateKeeperLayer {
Expand Down Expand Up @@ -126,7 +124,7 @@ impl WiringLayer for StateKeeperLayer {
#[derive(Debug)]
pub struct StateKeeperTask {
io: Box<dyn StateKeeperIO>,
executor_factory: BoxBatchExecutorFactory<OwnedStorage>,
executor_factory: Box<dyn BatchExecutorFactory<OwnedStorage>>,
output_handler: OutputHandler,
sealer: Arc<dyn ConditionalSealer>,
storage_factory: Arc<dyn ReadStorageFactory>,
Expand All @@ -139,14 +137,13 @@ impl Task for StateKeeperTask {
}

async fn run(self: Box<Self>, stop_receiver: StopReceiver) -> anyhow::Result<()> {
let executor_factory =
MainStateKeeperExecutorFactory::new(self.executor_factory, self.storage_factory);
let state_keeper = ZkSyncStateKeeper::new(
stop_receiver.0,
self.io,
Box::new(executor_factory),
self.executor_factory,
self.output_handler,
self.sealer,
self.storage_factory,
);
state_keeper.run().await
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use zksync_config::configs::vm_runner::BasicWitnessInputProducerConfig;
use zksync_types::L2ChainId;
use zksync_vm_executor::{batch::MainBatchExecutorFactory, interface::box_batch_executor_factory};
use zksync_vm_executor::batch::MainBatchExecutorFactory;
use zksync_vm_runner::{
impls::{BasicWitnessInputProducer, BasicWitnessInputProducerIo},
ConcurrentOutputHandlerFactoryTask, StorageSyncTask,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl WiringLayer for BasicWitnessInputProducerLayer {
let (basic_witness_input_producer, tasks) = BasicWitnessInputProducer::new(
connection_pool,
object_store.0,
box_batch_executor_factory(batch_executor),
Box::new(batch_executor),
self.config.db_path,
self.zksync_network_id,
self.config.first_processed_batch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::sync::Arc;

use zksync_state::OwnedStorage;
use zksync_state_keeper::{seal_criteria::ConditionalSealer, OutputHandler, StateKeeperIO};
use zksync_vm_executor::interface::{
box_batch_executor_factory, BatchExecutorFactory, BoxBatchExecutorFactory, StandardOutputs,
};
use zksync_vm_executor::interface::BatchExecutorFactory;

use crate::resource::{Resource, Unique};

Expand All @@ -28,7 +26,7 @@ impl<T: StateKeeperIO> From<T> for StateKeeperIOResource {
/// A resource that provides [`BatchExecutorFactory`] implementation to the service.
/// This resource is unique, e.g. it's expected to be consumed by a single service.
#[derive(Debug, Clone)]
pub struct BatchExecutorResource(pub Unique<BoxBatchExecutorFactory<OwnedStorage>>);
pub struct BatchExecutorResource(pub Unique<Box<dyn BatchExecutorFactory<OwnedStorage>>>);

impl Resource for BatchExecutorResource {
fn name() -> String {
Expand All @@ -38,10 +36,10 @@ impl Resource for BatchExecutorResource {

impl<T> From<T> for BatchExecutorResource
where
T: BatchExecutorFactory<OwnedStorage, Outputs = StandardOutputs<OwnedStorage>, Executor: Sized>,
T: BatchExecutorFactory<OwnedStorage>,
{
fn from(executor: T) -> Self {
Self(Unique::new(box_batch_executor_factory(executor)))
Self(Unique::new(Box::new(executor)))
}
}

Expand Down
11 changes: 4 additions & 7 deletions core/node/node_sync/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use zksync_node_test_utils::{
create_l1_batch_metadata, create_l2_transaction, prepare_recovery_snapshot,
};
use zksync_state_keeper::{
executor::MainStateKeeperExecutorFactory,
io::{L1BatchParams, L2BlockParams},
seal_criteria::NoopSealer,
testonly::test_batch_executor::{MockReadStorageFactory, TestBatchExecutorBuilder},
Expand Down Expand Up @@ -122,20 +121,18 @@ impl StateKeeperHandles {
.unwrap();

let (stop_sender, stop_receiver) = watch::channel(false);
let mut batch_executor_base = TestBatchExecutorBuilder::default();
let mut batch_executor = TestBatchExecutorBuilder::default();
for &tx_hashes_in_l1_batch in tx_hashes {
batch_executor_base.push_successful_transactions(tx_hashes_in_l1_batch);
batch_executor.push_successful_transactions(tx_hashes_in_l1_batch);
}

let state_keeper = ZkSyncStateKeeper::new(
stop_receiver,
Box::new(io),
Box::new(MainStateKeeperExecutorFactory::new(
batch_executor_base,
Arc::new(MockReadStorageFactory),
)),
Box::new(batch_executor),
output_handler,
Arc::new(NoopSealer),
Arc::new(MockReadStorageFactory),
);

Self {
Expand Down
Loading

0 comments on commit 3731102

Please sign in to comment.