Skip to content

Commit

Permalink
feat(en): Make reorg detector work with pruned data (#712)
Browse files Browse the repository at this point in the history
## What ❔

Modifies reorg detector so that it works with pruned node data during
snapshot recovery.

## Why ❔

Part of preparations of EN code to support snapshot recovery.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
slowli authored Dec 21, 2023
1 parent 3db25cb commit c4185d5
Show file tree
Hide file tree
Showing 13 changed files with 988 additions and 189 deletions.
33 changes: 20 additions & 13 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{sync::Arc, time::Duration};

use anyhow::Context;
use anyhow::Context as _;
use clap::Parser;
use futures::{future::FusedFuture, FutureExt};
use futures::{future::FusedFuture, FutureExt as _};
use metrics::EN_METRICS;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::{sync::watch, task, time::sleep};
Expand Down Expand Up @@ -443,23 +443,20 @@ async fn main() -> anyhow::Result<()> {

let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone(), stop_receiver);
let mut reorg_detector_handle = tokio::spawn(reorg_detector.run()).fuse();
let mut reorg_detector_result = None;

let particular_crypto_alerts = None;
let graceful_shutdown = None::<futures::future::Ready<()>>;
let tasks_allowed_to_finish = false;
let mut reorg_detector_last_correct_batch = None;

tokio::select! {
_ = wait_for_tasks(task_handles, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
last_correct_batch = &mut reorg_detector_handle => {
if let Ok(last_correct_batch) = last_correct_batch {
reorg_detector_last_correct_batch = last_correct_batch;
} else {
tracing::error!("Reorg detector actor failed");
}
result = &mut reorg_detector_handle => {
tracing::info!("Reorg detector terminated, shutting down");
reorg_detector_result = Some(result);
}
};

Expand All @@ -468,13 +465,23 @@ async fn main() -> anyhow::Result<()> {
shutdown_components(stop_sender, health_check_handle).await;

if !reorg_detector_handle.is_terminated() {
if let Ok(Some(last_correct_batch)) = reorg_detector_handle.await {
reorg_detector_last_correct_batch = Some(last_correct_batch);
}
reorg_detector_result = Some(reorg_detector_handle.await);
}
let reorg_detector_last_correct_batch = reorg_detector_result.and_then(|result| match result {
Ok(Ok(last_correct_batch)) => last_correct_batch,
Ok(Err(err)) => {
tracing::error!("Reorg detector failed: {err}");
None
}
Err(err) => {
tracing::error!("Reorg detector panicked: {err}");
None
}
});

if let Some(last_correct_batch) = reorg_detector_last_correct_batch {
tracing::info!("Performing rollback to block {}", last_correct_batch);
tracing::info!("Performing rollback to L1 batch #{last_correct_batch}");

let reverter = BlockReverter::new(
config.required.state_cache_path,
config.required.merkle_tree_path,
Expand Down
36 changes: 36 additions & 0 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -4072,6 +4072,24 @@
},
"query": "\n SELECT\n eth_txs_history.id,\n eth_txs_history.eth_tx_id,\n eth_txs_history.tx_hash,\n eth_txs_history.base_fee_per_gas,\n eth_txs_history.priority_fee_per_gas,\n eth_txs_history.signed_raw_tx,\n eth_txs.nonce\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NULL\n AND eth_txs.confirmed_eth_tx_history_id IS NULL\n ORDER BY\n eth_txs_history.id DESC\n "
},
"43c7e352d09f69de1a182196aea4de79b67833f17d252b5b0e8e00cd6e75b5c1": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT\n MIN(number) AS \"number\"\n FROM\n l1_batches\n "
},
"45b5825c82d33c9494ceef0fdc77675b89128d56559b8c89465844a914f5245e": {
"describe": {
"columns": [
Expand Down Expand Up @@ -11986,6 +12004,24 @@
},
"query": "\n SELECT\n number,\n l1_tx_count,\n l2_tx_count,\n timestamp,\n is_finished,\n fee_account_address,\n l2_to_l1_logs,\n l2_to_l1_messages,\n bloom,\n priority_ops_onchain_data,\n used_contract_hashes,\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n system_logs,\n compressed_state_diffs\n FROM\n l1_batches\n WHERE\n eth_commit_tx_id = $1\n OR eth_prove_tx_id = $1\n OR eth_execute_tx_id = $1\n "
},
"f91790ae5cc4b087bf942ba52dd63a1e89945f8d5e0f4da42ecf6313c4f5967e": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT\n MIN(number) AS \"number\"\n FROM\n l1_batches\n WHERE\n hash IS NOT NULL\n "
},
"f922c0718c9dda2f285f09cbabad425bac8ed3d2780c60c9b63afbcea131f9a0": {
"describe": {
"columns": [],
Expand Down
52 changes: 46 additions & 6 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,28 @@ impl BlocksDal<'_, '_> {
Ok(MiniblockNumber(number as u32))
}

/// Returns the number of the earliest L1 batch present in the DB, or `None` if there are no L1 batches.
pub async fn get_earliest_l1_batch_number(&mut self) -> sqlx::Result<Option<L1BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
MIN(number) AS "number"
FROM
l1_batches
"#
)
.instrument("get_earliest_l1_batch_number")
.report_latency()
.fetch_one(self.storage.conn())
.await?;

Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

pub async fn get_last_l1_batch_number_with_metadata(
&mut self,
) -> anyhow::Result<L1BatchNumber> {
let number: i64 = sqlx::query!(
) -> sqlx::Result<Option<L1BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
MAX(number) AS "number"
Expand All @@ -98,10 +116,32 @@ impl BlocksDal<'_, '_> {
.instrument("get_last_block_number_with_metadata")
.report_latency()
.fetch_one(self.storage.conn())
.await?
.number
.context("DAL invocation before genesis")?;
Ok(L1BatchNumber(number as u32))
.await?;

Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

/// Returns the number of the earliest L1 batch with metadata (= state hash) present in the DB,
/// or `None` if there are no such L1 batches.
pub async fn get_earliest_l1_batch_number_with_metadata(
&mut self,
) -> sqlx::Result<Option<L1BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
MIN(number) AS "number"
FROM
l1_batches
WHERE
hash IS NOT NULL
"#
)
.instrument("get_earliest_l1_batch_number_with_metadata")
.report_latency()
.fetch_one(self.storage.conn())
.await?;

Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

pub async fn get_l1_batches_for_eth_tx_id(
Expand Down
32 changes: 16 additions & 16 deletions core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ impl L1BatchMetricsReporter {

async fn report_metrics(&self) {
let mut conn = self.connection_pool.access_storage().await.unwrap();
let mut block_metrics = vec![
(
conn.blocks_dal()
.get_sealed_l1_batch_number()
.await
.unwrap(),
BlockStage::Sealed,
),
(
conn.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.await
.unwrap(),
BlockStage::MetadataCalculated,
),
];
let mut block_metrics = vec![(
conn.blocks_dal()
.get_sealed_l1_batch_number()
.await
.unwrap(),
BlockStage::Sealed,
)];

let last_l1_batch_with_metadata = conn
.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.await
.unwrap();
if let Some(number) = last_l1_batch_with_metadata {
block_metrics.push((number, BlockStage::MetadataCalculated));
}

let eth_stats = conn.eth_sender_dal().get_eth_l1_batches().await.unwrap();
for (tx_type, l1_batch) in eth_stats.saved {
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod reorg_detector;
pub mod state_keeper;
pub mod sync_layer;
pub mod temp_config_store;
mod utils;

/// Inserts the initial information about zkSync tokens into the database.
pub async fn genesis_init(
Expand Down
4 changes: 4 additions & 0 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ impl Delayer {
}
}

pub fn delay_interval(&self) -> Duration {
self.delay_interval
}

#[cfg_attr(not(test), allow(unused))] // `tree` is only used in test mode
pub fn wait(&self, tree: &AsyncTree) -> impl Future<Output = ()> {
#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/metadata_calculator/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl AsyncTreeRecovery {
}

async fn snapshot_l1_batch(_pool: &ConnectionPool) -> anyhow::Result<Option<L1BatchNumber>> {
Ok(None) // FIXME: implement real logic
Ok(None) // FIXME (PLA-708): implement real logic
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions core/lib/zksync_core/src/metadata_calculator/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Tests for the metadata calculator component life cycle.
// TODO (PLA-708): test full recovery life cycle

use std::{future::Future, ops, panic, path::Path, time::Duration};

use assert_matches::assert_matches;
Expand Down
81 changes: 43 additions & 38 deletions core/lib/zksync_core/src/metadata_calculator/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::{
metrics::{TreeUpdateStage, METRICS},
MetadataCalculator,
};
use crate::utils::wait_for_l1_batch;

#[derive(Debug)]
pub(super) struct TreeUpdater {
Expand Down Expand Up @@ -267,77 +268,81 @@ impl TreeUpdater {
mut stop_receiver: watch::Receiver<bool>,
health_updater: HealthUpdater,
) -> anyhow::Result<()> {
let mut storage = pool
.access_storage_tagged("metadata_calculator")
.await
.unwrap();
let Some(earliest_l1_batch) =
wait_for_l1_batch(pool, delayer.delay_interval(), &mut stop_receiver).await?
else {
return Ok(()); // Stop signal received
};
let mut storage = pool.access_storage_tagged("metadata_calculator").await?;

// Ensure genesis creation
let tree = &mut self.tree;
if tree.is_empty() {
let logs = L1BatchWithLogs::new(&mut storage, L1BatchNumber(0))
assert_eq!(
earliest_l1_batch,
L1BatchNumber(0),
"Non-zero earliest L1 batch is not supported without previous tree recovery"
);
let logs = L1BatchWithLogs::new(&mut storage, earliest_l1_batch)
.await
.context("Missing storage logs for the genesis L1 batch")?;
tree.process_l1_batch(logs.storage_logs).await;
tree.save().await;
}
let mut next_l1_batch_to_seal = tree.next_l1_batch_number();

let current_db_batch = storage
.blocks_dal()
.get_sealed_l1_batch_number()
.await
.unwrap();
let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?;
let last_l1_batch_with_metadata = storage
.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.await
.unwrap();
.await?;
drop(storage);

tracing::info!(
"Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \
Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch}, \
last L1 batch with metadata: {last_l1_batch_with_metadata}",
last L1 batch with metadata: {last_l1_batch_with_metadata:?}",
max_batches_per_iter = self.max_l1_batches_per_iter
);
let backup_lag =
(last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0);
METRICS.backup_lag.set(backup_lag.into());

let tree_info = tree.reader().info().await;
health_updater.update(tree_info.into());

if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 {
// Check stop signal before proceeding with a potentially time-consuming operation.
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
return Ok(());
}
// It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after
// recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component
// responsible for their appearance!), but fortunately most of the updater doesn't depend on it.
if let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata {
let backup_lag =
(last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0);
METRICS.backup_lag.set(backup_lag.into());

if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 {
// Check stop signal before proceeding with a potentially time-consuming operation.
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
return Ok(());
}

tracing::warn!(
"Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \
({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \
Truncating Merkle tree versions so that this mismatch is fixed..."
);
tree.revert_logs(last_l1_batch_with_metadata);
tree.save().await;
next_l1_batch_to_seal = tree.next_l1_batch_number();
tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}");
tracing::warn!(
"Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \
({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \
Truncating Merkle tree versions so that this mismatch is fixed..."
);
tree.revert_logs(last_l1_batch_with_metadata);
tree.save().await;
next_l1_batch_to_seal = tree.next_l1_batch_number();
tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}");

let tree_info = tree.reader().info().await;
health_updater.update(tree_info.into());
let tree_info = tree.reader().info().await;
health_updater.update(tree_info.into());
}
}

loop {
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
break;
}
let storage = pool
.access_storage_tagged("metadata_calculator")
.await
.unwrap();
let storage = pool.access_storage_tagged("metadata_calculator").await?;

let snapshot = *next_l1_batch_to_seal;
self.step(storage, &mut next_l1_batch_to_seal).await;
Expand Down
Loading

0 comments on commit c4185d5

Please sign in to comment.