Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(vm-runner): implement VM runner storage layer #1651

Merged
merged 49 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0bbb574
move `ReadStorageFactory` to state crate
itegulov Apr 5, 2024
d02082e
add scaffolding for VM runner + basic storage struct
itegulov Apr 8, 2024
b2a21e5
implement storage layer
itegulov Apr 8, 2024
0922889
Add docs + some cleanup
itegulov Apr 9, 2024
4a8ecc6
load L1BatchEnv and SystemEnv via L1BatchParamsProvider
itegulov Apr 9, 2024
bb8558f
move vm_runner inside zksync_core and write tests
itegulov Apr 11, 2024
e4186cd
Merge branch 'main' into daniyar/vm-runner-storage
itegulov Apr 11, 2024
7861e70
fmt + dal errors
itegulov Apr 11, 2024
ae1dcce
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 12, 2024
34f6dfa
lint + spellcheck
itegulov Apr 12, 2024
af02ffa
revert contracts
itegulov Apr 12, 2024
8548fc7
get rid of `validation_computational_gas_limit`
itegulov Apr 12, 2024
5dfec56
make `get_miniblocks_to_execute_for_l1_batch` support fictive miniblocks
itegulov Apr 15, 2024
e67e140
continuously sync VmRunnerStorage in a separate task
itegulov Apr 16, 2024
fda4841
access storage and load blocks from continuous state
itegulov Apr 16, 2024
95e46ad
fmt
itegulov Apr 16, 2024
35b1152
presume a single fictive miniblock
itegulov Apr 17, 2024
80d8cd2
reverse enumeration index lookup order
itegulov Apr 17, 2024
2f463e4
get rid of `first_unprocessed_batch`
itegulov Apr 17, 2024
e2c941a
block until data is available for `access_storage`
itegulov Apr 17, 2024
a99e8e2
synchronize RocksDB up to last processed batch
itegulov Apr 17, 2024
7e0d9ae
replace `max_batches_to_load` with `last_ready_to_be_loaded_batch`
itegulov Apr 18, 2024
538e1e8
refactor batch diff layers
itegulov Apr 18, 2024
26c5375
lint + spellcheck
itegulov Apr 18, 2024
1057ff4
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 18, 2024
99e4ea7
adjust for rocksdb returning the next batch number
itegulov Apr 18, 2024
44dd8ba
improve invariant by including `RocksdbStorage` in state
itegulov Apr 18, 2024
7f97413
make VmRunnerStorage non-blocking
itegulov Apr 18, 2024
69ca6ac
lint
itegulov Apr 18, 2024
cf83db2
ensure we don't try to catch up RocksDB to unobtainable batch
itegulov Apr 18, 2024
0f95436
short-circuit on missing l1 batch number in rocksdb
itegulov Apr 18, 2024
4de8c94
cleanup `RocksdbWithMemory`
itegulov Apr 18, 2024
33b8631
simplify `enum_index_diff` computation
itegulov Apr 18, 2024
1054118
relax lock requirements
itegulov Apr 18, 2024
21cf8e3
sleep `StorageSyncTask` on noop
itegulov Apr 18, 2024
87bd03d
move State usability criterion to a method
itegulov Apr 18, 2024
2df318e
insert fictive miniblock at the end of a batch in tests
itegulov Apr 18, 2024
3badae0
fmt
itegulov Apr 18, 2024
793ed21
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 19, 2024
b333f9c
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 19, 2024
e469611
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 22, 2024
5ef75e8
randomly insert storage logs, factory deps and enum indices for tests
itegulov Apr 23, 2024
5f68559
account for rocksdb reporting a higher l1 batch number
itegulov Apr 23, 2024
73b408a
fmt
itegulov Apr 23, 2024
bb44039
further relax state lock requirements
itegulov Apr 23, 2024
20fdf20
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 23, 2024
3574949
lint
itegulov Apr 23, 2024
05bad1c
Merge remote-tracking branch 'origin/main' into daniyar/pla-865-vm-ru…
itegulov Apr 23, 2024
21cac86
spellcheck + lint
itegulov Apr 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ anyhow = "1"
assert_matches = "1.5"
async-trait = "0.1"
axum = "0.6.19"
backon = "0.4.4"
bigdecimal = "0.3.0"
bincode = "1"
bitflags = "1.3.2"
Expand Down Expand Up @@ -226,6 +227,6 @@ zksync_crypto_primitives = { path = "core/lib/crypto_primitives" }
zksync_node_framework = { path = "core/node/node_framework" }
zksync_eth_watch = { path = "core/node/eth_watch" }
zksync_shared_metrics = { path = "core/node/shared_metrics" }
zksync_block_reverter = { path = "core/node/block_reverter"}
zksync_block_reverter = { path = "core/node/block_reverter" }
zksync_commitment_generator = { path = "core/node/commitment_generator" }
zksync_house_keeper = { path = "core/node/house_keeper" }
zksync_house_keeper = { path = "core/node/house_keeper" }
1 change: 1 addition & 0 deletions checks-config/era.dic
Original file line number Diff line number Diff line change
Expand Up @@ -943,3 +943,4 @@ superset
80M
780kb
hyperchain
storages

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions core/lib/dal/src/factory_deps_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,23 @@ impl FactoryDepsDal<'_, '_> {
.await?;
Ok(())
}

/// Retrieves all factory deps entries for testing purposes.
pub async fn dump_all_factory_deps_for_tests(&mut self) -> HashMap<H256, Vec<u8>> {
sqlx::query!(
r#"
SELECT
bytecode,
bytecode_hash
FROM
factory_deps
"#
)
.fetch_all(self.storage.conn())
.await
.unwrap()
.into_iter()
.map(|row| (H256::from_slice(&row.bytecode_hash), row.bytecode))
.collect()
}
}
26 changes: 26 additions & 0 deletions core/lib/dal/src/storage_logs_dedup_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,32 @@ impl StorageLogsDedupDal<'_, '_> {
.map(|row| row.index as u64))
}

pub async fn get_enumeration_index_in_l1_batch(
&mut self,
hashed_key: H256,
l1_batch_number: L1BatchNumber,
) -> DalResult<Option<u64>> {
Ok(sqlx::query!(
r#"
SELECT
INDEX
FROM
initial_writes
WHERE
hashed_key = $1
AND l1_batch_number <= $2
"#,
hashed_key.as_bytes(),
l1_batch_number.0 as i32,
)
.instrument("get_enumeration_index_in_l1_batch")
.with_arg("hashed_key", &hashed_key)
.with_arg("l1_batch_number", &l1_batch_number)
.fetch_optional(self.storage)
.await?
.map(|row| row.index as u64))
}

/// Returns `hashed_keys` that are both present in the input and in `initial_writes` table.
pub async fn filter_written_slots(&mut self, hashed_keys: &[H256]) -> DalResult<HashSet<H256>> {
let hashed_keys: Vec<_> = hashed_keys.iter().map(H256::as_bytes).collect();
Expand Down
34 changes: 31 additions & 3 deletions core/lib/dal/src/transactions_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,8 @@ impl TransactionsDal<'_, '_> {
.fetch_all(self.storage)
.await?;

self.map_transactions_to_execution_data(transactions).await
self.map_transactions_to_execution_data(transactions, None)
.await
}

/// Returns L2 blocks with their transactions to be used in VM execution.
Expand Down Expand Up @@ -1282,14 +1283,37 @@ impl TransactionsDal<'_, '_> {
.fetch_all(self.storage)
.await?;

self.map_transactions_to_execution_data(transactions).await
let fictive_l2_block = sqlx::query!(
r#"
SELECT
number
FROM
miniblocks
WHERE
miniblocks.l1_batch_number = $1
AND l1_tx_count = 0
AND l2_tx_count = 0
ORDER BY
number
"#,
i64::from(l1_batch_number.0)
)
.instrument("get_l2_blocks_to_execute_for_l1_batch#fictive_l2_block")
.with_arg("l1_batch_number", &l1_batch_number)
.fetch_optional(self.storage)
.await?
.map(|row| L2BlockNumber(row.number as u32));

self.map_transactions_to_execution_data(transactions, fictive_l2_block)
.await
}

async fn map_transactions_to_execution_data(
&mut self,
transactions: Vec<StorageTransaction>,
fictive_l2_block: Option<L2BlockNumber>,
) -> DalResult<Vec<L2BlockExecutionData>> {
let transactions_by_l2_block: Vec<(L2BlockNumber, Vec<Transaction>)> = transactions
let mut transactions_by_l2_block: Vec<(L2BlockNumber, Vec<Transaction>)> = transactions
.into_iter()
.group_by(|tx| tx.miniblock_number.unwrap())
.into_iter()
Expand All @@ -1300,6 +1324,10 @@ impl TransactionsDal<'_, '_> {
)
})
.collect();
// Fictive L2 block is always at the end of a batch so it is safe to append it
if let Some(fictive_l2_block) = fictive_l2_block {
transactions_by_l2_block.push((fictive_l2_block, Vec::new()));
}
if transactions_by_l2_block.is_empty() {
return Ok(Vec::new());
}
Expand Down
3 changes: 3 additions & 0 deletions core/lib/state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ vise.workspace = true
zksync_dal.workspace = true
zksync_types.workspace = true
zksync_utils.workspace = true
zksync_shared_metrics.workspace = true
zksync_storage.workspace = true

anyhow.workspace = true
async-trait.workspace = true
mini-moka.workspace = true
tokio = { workspace = true, features = ["rt"] }
tracing.workspace = true
itertools.workspace = true
chrono.workspace = true
once_cell.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
80 changes: 80 additions & 0 deletions core/lib/state/src/catchup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{sync::Arc, time::Instant};

use anyhow::Context;
use once_cell::sync::OnceCell;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core};
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_storage::RocksDB;
use zksync_types::L1BatchNumber;

use crate::{RocksdbStorage, StateKeeperColumnFamily};

/// A runnable task that blocks until the provided RocksDB cache instance is caught up with
/// Postgres.
///
/// See [`ReadStorageFactory`] for more context.
#[derive(Debug)]
pub struct AsyncCatchupTask {
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
}

impl AsyncCatchupTask {
/// Create a new catch-up task with the provided Postgres and RocksDB instances. Optionally
/// accepts the last L1 batch number to catch up to (defaults to latest if not specified).
pub fn new(
pool: ConnectionPool<Core>,
state_keeper_db_path: String,
rocksdb_cell: Arc<OnceCell<RocksDB<StateKeeperColumnFamily>>>,
to_l1_batch_number: Option<L1BatchNumber>,
) -> Self {
Self {
pool,
state_keeper_db_path,
rocksdb_cell,
to_l1_batch_number,
}
}

/// Block until RocksDB cache instance is caught up with Postgres.
///
/// # Errors
///
/// Propagates RocksDB and Postgres errors.
pub async fn run(self, stop_receiver: watch::Receiver<bool>) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::debug!("Catching up RocksDB asynchronously");

let mut rocksdb_builder = RocksdbStorage::builder(self.state_keeper_db_path.as_ref())
.await
.context("Failed creating RocksDB storage builder")?;
let mut connection = self.pool.connection().await?;
let was_recovered_from_snapshot = rocksdb_builder
.ensure_ready(&mut connection, &stop_receiver)
.await
.context("failed initializing state keeper RocksDB from snapshot or scratch")?;
if was_recovered_from_snapshot {
let elapsed = started_at.elapsed();
APP_METRICS.snapshot_recovery_latency[&SnapshotRecoveryStage::StateKeeperCache]
.set(elapsed);
tracing::info!("Recovered state keeper RocksDB from snapshot in {elapsed:?}");
}

let rocksdb = rocksdb_builder
.synchronize(&mut connection, &stop_receiver, self.to_l1_batch_number)
.await
.context("Failed to catch up RocksDB to Postgres")?;
drop(connection);
if let Some(rocksdb) = rocksdb {
self.rocksdb_cell
.set(rocksdb.into_rocksdb())
.map_err(|_| anyhow::anyhow!("Async RocksDB cache was initialized twice"))?;
} else {
tracing::info!("Synchronizing RocksDB interrupted");
}
Ok(())
}
}
4 changes: 4 additions & 0 deletions core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@ use zksync_types::{
};

mod cache;
mod catchup;
mod in_memory;
mod postgres;
mod rocksdb;
mod shadow_storage;
mod storage_factory;
mod storage_view;
#[cfg(test)]
mod test_utils;
mod witness;

pub use self::{
cache::sequential_cache::SequentialCache,
catchup::AsyncCatchupTask,
in_memory::InMemoryStorage,
postgres::{PostgresStorage, PostgresStorageCaches, PostgresStorageCachesTask},
rocksdb::{RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily},
shadow_storage::ShadowStorage,
storage_factory::{BatchDiff, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory},
storage_view::{StorageView, StorageViewMetrics},
witness::WitnessStorage,
};
Expand Down
5 changes: 4 additions & 1 deletion core/lib/state/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,10 @@ impl ReadStorage for PostgresStorage<'_> {
let mut dal = self.connection.storage_logs_dedup_dal();
let value = self
.rt_handle
.block_on(dal.get_enumeration_index_for_key(key.hashed_key()));
.block_on(dal.get_enumeration_index_in_l1_batch(
key.hashed_key(),
self.l1_batch_number_for_l2_block,
));
value.expect("failed getting enumeration index for key")
}
}
Loading
Loading