Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(state-keeper): Propagate I/O errors in state keeper #1080

Merged
merged 9 commits into from
Feb 22, 2024
33 changes: 19 additions & 14 deletions core/lib/dal/src/factory_deps_dal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};

use anyhow::Context as _;
use zksync_contracts::{BaseSystemContracts, SystemContractCode};
use zksync_types::{MiniblockNumber, H256, U256};
use zksync_utils::{bytes_to_be_words, bytes_to_chunks};
Expand All @@ -22,7 +23,7 @@ impl FactoryDepsDal<'_, '_> {
) -> sqlx::Result<()> {
let (bytecode_hashes, bytecodes): (Vec<_>, Vec<_>) = factory_deps
.iter()
.map(|dep| (dep.0.as_bytes(), dep.1.as_slice()))
.map(|(hash, bytecode)| (hash.as_bytes(), bytecode.as_slice()))
.unzip();

// Copy from stdin can't be used here because of `ON CONFLICT`.
Expand Down Expand Up @@ -51,8 +52,8 @@ impl FactoryDepsDal<'_, '_> {
}

/// Returns bytecode for a factory dependency with the specified bytecode `hash`.
pub async fn get_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
sqlx::query!(
pub async fn get_factory_dep(&mut self, hash: H256) -> sqlx::Result<Option<Vec<u8>>> {
Ok(sqlx::query!(
r#"
SELECT
bytecode
Expand All @@ -64,20 +65,20 @@ impl FactoryDepsDal<'_, '_> {
hash.as_bytes(),
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| row.bytecode)
.await?
.map(|row| row.bytecode))
}

pub async fn get_base_system_contracts(
&mut self,
bootloader_hash: H256,
default_aa_hash: H256,
) -> BaseSystemContracts {
) -> anyhow::Result<BaseSystemContracts> {
let bootloader_bytecode = self
.get_factory_dep(bootloader_hash)
.await
.expect("Bootloader code should be present in the database");
.context("failed loading bootloader code")?
.with_context(|| format!("bootloader code with hash {bootloader_hash:?} should be present in the database"))?;
let bootloader_code = SystemContractCode {
code: bytes_to_be_words(bootloader_bytecode),
hash: bootloader_hash,
Expand All @@ -86,16 +87,17 @@ impl FactoryDepsDal<'_, '_> {
let default_aa_bytecode = self
.get_factory_dep(default_aa_hash)
.await
.expect("Default account code should be present in the database");
.context("failed loading default account code")?
.with_context(|| format!("default account code with hash {default_aa_hash:?} should be present in the database"))?;

let default_aa_code = SystemContractCode {
code: bytes_to_be_words(default_aa_bytecode),
hash: default_aa_hash,
};
BaseSystemContracts {
Ok(BaseSystemContracts {
bootloader: bootloader_code,
default_aa: default_aa_code,
}
})
}

/// Returns bytecodes for factory deps with the specified `hashes`.
Expand Down Expand Up @@ -155,7 +157,10 @@ impl FactoryDepsDal<'_, '_> {
}

/// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`.
pub async fn rollback_factory_deps(&mut self, block_number: MiniblockNumber) {
pub async fn rollback_factory_deps(
&mut self,
block_number: MiniblockNumber,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
DELETE FROM factory_deps
Expand All @@ -165,7 +170,7 @@ impl FactoryDepsDal<'_, '_> {
block_number.0 as i64
)
.execute(self.storage.conn())
.await
.unwrap();
.await?;
Ok(())
}
}
40 changes: 23 additions & 17 deletions core/lib/dal/src/protocol_versions_dal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;

use anyhow::Context as _;
use zksync_contracts::{BaseSystemContracts, BaseSystemContractsHashes};
use zksync_types::{
protocol_version::{L1VerifierConfig, ProtocolUpgradeTx, ProtocolVersion, VerifierParams},
Expand Down Expand Up @@ -142,7 +143,7 @@ impl ProtocolVersionsDal<'_, '_> {
pub async fn base_system_contracts_by_timestamp(
&mut self,
current_timestamp: u64,
) -> (BaseSystemContracts, ProtocolVersionId) {
) -> anyhow::Result<(BaseSystemContracts, ProtocolVersionId)> {
let row = sqlx::query!(
r#"
SELECT
Expand All @@ -162,22 +163,26 @@ impl ProtocolVersionsDal<'_, '_> {
)
.fetch_one(self.storage.conn())
.await
.unwrap();
.context("cannot fetch system contract hashes")?;

let protocol_version = (row.id as u16)
.try_into()
.context("bogus protocol version ID")?;
let contracts = self
.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await;
(contracts, (row.id as u16).try_into().unwrap())
.await?;
Ok((contracts, protocol_version))
}

pub async fn load_base_system_contracts_by_version_id(
&mut self,
version_id: u16,
) -> Option<BaseSystemContracts> {
) -> anyhow::Result<Option<BaseSystemContracts>> {
let row = sqlx::query!(
r#"
SELECT
Expand All @@ -192,20 +197,21 @@ impl ProtocolVersionsDal<'_, '_> {
)
.fetch_optional(self.storage.conn())
.await
.unwrap();
if let Some(row) = row {
Some(
self.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await,
)
.context("cannot fetch system contract hashes")?;

Ok(if let Some(row) = row {
let contracts = self
.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await?;
Some(contracts)
} else {
None
}
})
}

pub async fn load_previous_version(
Expand Down
3 changes: 2 additions & 1 deletion core/lib/vm_utils/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ impl L1BatchParamsProvider {
let base_system_contracts = storage
.factory_deps_dal()
.get_base_system_contracts(contract_hashes.bootloader, contract_hashes.default_aa)
.await;
.await
.context("failed getting base system contracts")?;

Ok(l1_batch_params(
first_miniblock_in_batch.l1_batch_number,
Expand Down
7 changes: 5 additions & 2 deletions core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,11 @@ impl ZksNamespace {

let method_latency = API_METRICS.start_call(METHOD_NAME);
let mut storage = self.access_storage(METHOD_NAME).await?;
let bytecode = storage.factory_deps_dal().get_factory_dep(hash).await;

let bytecode = storage
.factory_deps_dal()
.get_factory_dep(hash)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
method_latency.observe();
Ok(bytecode)
}
Expand Down
5 changes: 2 additions & 3 deletions core/lib/zksync_core/src/block_reverter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,15 @@ impl BlockReverter {
transaction
.factory_deps_dal()
.rollback_factory_deps(last_miniblock_to_keep)
.await;

.await
.expect("Failed rolling back factory dependencies");
tracing::info!("rolling back storage...");
#[allow(deprecated)]
transaction
.storage_logs_dal()
.rollback_storage(last_miniblock_to_keep)
.await
.expect("failed rolling back storage");

tracing::info!("rolling back storage logs...");
transaction
.storage_logs_dal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_types::MiniblockNumber;

/// Runs the migration for pending miniblocks.
pub(crate) async fn migrate_pending_miniblocks(storage: &mut StorageProcessor<'_>) {
pub(crate) async fn migrate_pending_miniblocks(
storage: &mut StorageProcessor<'_>,
) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::info!("Started migrating `fee_account_address` for pending miniblocks");

Expand All @@ -19,20 +21,21 @@ pub(crate) async fn migrate_pending_miniblocks(storage: &mut StorageProcessor<'_
.blocks_dal()
.check_l1_batches_have_fee_account_address()
.await
.expect("Failed getting metadata for l1_batches table");
.context("failed getting metadata for l1_batches table")?;
if !l1_batches_have_fee_account_address {
tracing::info!("`l1_batches.fee_account_address` column is removed; assuming that the migration is complete");
return;
return Ok(());
}

#[allow(deprecated)]
let rows_affected = storage
.blocks_dal()
.copy_fee_account_address_for_pending_miniblocks()
.await
.expect("Failed migrating `fee_account_address` for pending miniblocks");
.context("failed migrating `fee_account_address` for pending miniblocks")?;
let elapsed = started_at.elapsed();
tracing::info!("Migrated `fee_account_address` for {rows_affected} miniblocks in {elapsed:?}");
Ok(())
}

/// Runs the migration for non-pending miniblocks. Should be run as a background task.
Expand Down
Loading
Loading