Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Make reorg detector work with pruned data #712

33 changes: 20 additions & 13 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{sync::Arc, time::Duration};

use anyhow::Context;
use anyhow::Context as _;
use clap::Parser;
use futures::{future::FusedFuture, FutureExt};
use futures::{future::FusedFuture, FutureExt as _};
use metrics::EN_METRICS;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::{sync::watch, task, time::sleep};
Expand Down Expand Up @@ -443,23 +443,20 @@ async fn main() -> anyhow::Result<()> {

let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone(), stop_receiver);
let mut reorg_detector_handle = tokio::spawn(reorg_detector.run()).fuse();
let mut reorg_detector_result = None;

let particular_crypto_alerts = None;
let graceful_shutdown = None::<futures::future::Ready<()>>;
let tasks_allowed_to_finish = false;
let mut reorg_detector_last_correct_batch = None;

tokio::select! {
_ = wait_for_tasks(task_handles, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {},
_ = sigint_receiver => {
tracing::info!("Stop signal received, shutting down");
},
last_correct_batch = &mut reorg_detector_handle => {
if let Ok(last_correct_batch) = last_correct_batch {
reorg_detector_last_correct_batch = last_correct_batch;
} else {
tracing::error!("Reorg detector actor failed");
}
result = &mut reorg_detector_handle => {
tracing::info!("Reorg detector terminated, shutting down");
reorg_detector_result = Some(result);
}
};

Expand All @@ -468,13 +465,23 @@ async fn main() -> anyhow::Result<()> {
shutdown_components(stop_sender, health_check_handle).await;

if !reorg_detector_handle.is_terminated() {
if let Ok(Some(last_correct_batch)) = reorg_detector_handle.await {
reorg_detector_last_correct_batch = Some(last_correct_batch);
}
reorg_detector_result = Some(reorg_detector_handle.await);
}
let reorg_detector_last_correct_batch = reorg_detector_result.and_then(|result| match result {
Ok(Ok(last_correct_batch)) => last_correct_batch,
Ok(Err(err)) => {
tracing::error!("Reorg detector failed: {err}");
None
}
Err(err) => {
tracing::error!("Reorg detector panicked: {err}");
None
}
});

if let Some(last_correct_batch) = reorg_detector_last_correct_batch {
tracing::info!("Performing rollback to block {}", last_correct_batch);
tracing::info!("Performing rollback to L1 batch #{last_correct_batch}");

let reverter = BlockReverter::new(
config.required.state_cache_path,
config.required.merkle_tree_path,
Expand Down
36 changes: 36 additions & 0 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -4072,6 +4072,24 @@
},
"query": "\n SELECT\n eth_txs_history.id,\n eth_txs_history.eth_tx_id,\n eth_txs_history.tx_hash,\n eth_txs_history.base_fee_per_gas,\n eth_txs_history.priority_fee_per_gas,\n eth_txs_history.signed_raw_tx,\n eth_txs.nonce\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NULL\n AND eth_txs.confirmed_eth_tx_history_id IS NULL\n ORDER BY\n eth_txs_history.id DESC\n "
},
"43c7e352d09f69de1a182196aea4de79b67833f17d252b5b0e8e00cd6e75b5c1": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT\n MIN(number) AS \"number\"\n FROM\n l1_batches\n "
},
"45b5825c82d33c9494ceef0fdc77675b89128d56559b8c89465844a914f5245e": {
"describe": {
"columns": [
Expand Down Expand Up @@ -11986,6 +12004,24 @@
},
"query": "\n SELECT\n number,\n l1_tx_count,\n l2_tx_count,\n timestamp,\n is_finished,\n fee_account_address,\n l2_to_l1_logs,\n l2_to_l1_messages,\n bloom,\n priority_ops_onchain_data,\n used_contract_hashes,\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n system_logs,\n compressed_state_diffs\n FROM\n l1_batches\n WHERE\n eth_commit_tx_id = $1\n OR eth_prove_tx_id = $1\n OR eth_execute_tx_id = $1\n "
},
"f91790ae5cc4b087bf942ba52dd63a1e89945f8d5e0f4da42ecf6313c4f5967e": {
"describe": {
"columns": [
{
"name": "number",
"ordinal": 0,
"type_info": "Int8"
}
],
"nullable": [
null
],
"parameters": {
"Left": []
}
},
"query": "\n SELECT\n MIN(number) AS \"number\"\n FROM\n l1_batches\n WHERE\n hash IS NOT NULL\n "
},
"f922c0718c9dda2f285f09cbabad425bac8ed3d2780c60c9b63afbcea131f9a0": {
"describe": {
"columns": [],
Expand Down
52 changes: 46 additions & 6 deletions core/lib/dal/src/blocks_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,28 @@ impl BlocksDal<'_, '_> {
Ok(MiniblockNumber(number as u32))
}

/// Returns the number of the earliest L1 batch present in the DB, or `None` if there are no L1 batches.
pub async fn get_earliest_l1_batch_number(&mut self) -> sqlx::Result<Option<L1BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
MIN(number) AS "number"
FROM
l1_batches
"#
)
.instrument("get_earliest_l1_batch_number")
.report_latency()
.fetch_one(self.storage.conn())
.await?;

Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

pub async fn get_last_l1_batch_number_with_metadata(
&mut self,
) -> anyhow::Result<L1BatchNumber> {
let number: i64 = sqlx::query!(
) -> sqlx::Result<Option<L1BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
MAX(number) AS "number"
Expand All @@ -98,10 +116,32 @@ impl BlocksDal<'_, '_> {
.instrument("get_last_block_number_with_metadata")
.report_latency()
.fetch_one(self.storage.conn())
.await?
.number
.context("DAL invocation before genesis")?;
Ok(L1BatchNumber(number as u32))
.await?;

Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

/// Returns the number of the earliest L1 batch with metadata (= state hash) present in the DB,
/// or `None` if there are no such L1 batches.
pub async fn get_earliest_l1_batch_number_with_metadata(
&mut self,
) -> sqlx::Result<Option<L1BatchNumber>> {
let row = sqlx::query!(
r#"
SELECT
MIN(number) AS "number"
FROM
l1_batches
WHERE
hash IS NOT NULL
"#
)
.instrument("get_earliest_l1_batch_number_with_metadata")
.report_latency()
.fetch_one(self.storage.conn())
.await?;

Ok(row.number.map(|num| L1BatchNumber(num as u32)))
}

pub async fn get_l1_batches_for_eth_tx_id(
Expand Down
32 changes: 16 additions & 16 deletions core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ impl L1BatchMetricsReporter {

async fn report_metrics(&self) {
let mut conn = self.connection_pool.access_storage().await.unwrap();
let mut block_metrics = vec![
(
conn.blocks_dal()
.get_sealed_l1_batch_number()
.await
.unwrap(),
BlockStage::Sealed,
),
(
conn.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.await
.unwrap(),
BlockStage::MetadataCalculated,
),
];
let mut block_metrics = vec![(
conn.blocks_dal()
.get_sealed_l1_batch_number()
.await
.unwrap(),
BlockStage::Sealed,
)];

let last_l1_batch_with_metadata = conn
.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.await
.unwrap();
if let Some(number) = last_l1_batch_with_metadata {
block_metrics.push((number, BlockStage::MetadataCalculated));
}

let eth_stats = conn.eth_sender_dal().get_eth_l1_batches().await.unwrap();
for (tx_type, l1_batch) in eth_stats.saved {
Expand Down
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod reorg_detector;
pub mod state_keeper;
pub mod sync_layer;
pub mod temp_config_store;
mod utils;

/// Inserts the initial information about zkSync tokens into the database.
pub async fn genesis_init(
Expand Down
4 changes: 4 additions & 0 deletions core/lib/zksync_core/src/metadata_calculator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ impl Delayer {
}
}

pub fn delay_interval(&self) -> Duration {
self.delay_interval
}

#[cfg_attr(not(test), allow(unused))] // `tree` is only used in test mode
pub fn wait(&self, tree: &AsyncTree) -> impl Future<Output = ()> {
#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/metadata_calculator/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ impl AsyncTreeRecovery {
}

async fn snapshot_l1_batch(_pool: &ConnectionPool) -> anyhow::Result<Option<L1BatchNumber>> {
Ok(None) // FIXME: implement real logic
Ok(None) // FIXME (PLA-708): implement real logic
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions core/lib/zksync_core/src/metadata_calculator/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Tests for the metadata calculator component life cycle.

// TODO (PLA-708): test full recovery life cycle

use std::{future::Future, ops, panic, path::Path, time::Duration};

use assert_matches::assert_matches;
Expand Down
81 changes: 43 additions & 38 deletions core/lib/zksync_core/src/metadata_calculator/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::{
metrics::{TreeUpdateStage, METRICS},
MetadataCalculator,
};
use crate::utils::wait_for_l1_batch;

#[derive(Debug)]
pub(super) struct TreeUpdater {
Expand Down Expand Up @@ -267,77 +268,81 @@ impl TreeUpdater {
mut stop_receiver: watch::Receiver<bool>,
health_updater: HealthUpdater,
) -> anyhow::Result<()> {
let mut storage = pool
.access_storage_tagged("metadata_calculator")
.await
.unwrap();
let Some(earliest_l1_batch) =
wait_for_l1_batch(pool, delayer.delay_interval(), &mut stop_receiver).await?
else {
return Ok(()); // Stop signal received
};
let mut storage = pool.access_storage_tagged("metadata_calculator").await?;

// Ensure genesis creation
let tree = &mut self.tree;
if tree.is_empty() {
let logs = L1BatchWithLogs::new(&mut storage, L1BatchNumber(0))
assert_eq!(
earliest_l1_batch,
L1BatchNumber(0),
"Non-zero earliest L1 batch is not supported without previous tree recovery"
);
let logs = L1BatchWithLogs::new(&mut storage, earliest_l1_batch)
.await
.context("Missing storage logs for the genesis L1 batch")?;
tree.process_l1_batch(logs.storage_logs).await;
tree.save().await;
}
let mut next_l1_batch_to_seal = tree.next_l1_batch_number();

let current_db_batch = storage
.blocks_dal()
.get_sealed_l1_batch_number()
.await
.unwrap();
let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?;
let last_l1_batch_with_metadata = storage
.blocks_dal()
.get_last_l1_batch_number_with_metadata()
.await
.unwrap();
.await?;
drop(storage);

tracing::info!(
"Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \
Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch}, \
last L1 batch with metadata: {last_l1_batch_with_metadata}",
last L1 batch with metadata: {last_l1_batch_with_metadata:?}",
max_batches_per_iter = self.max_l1_batches_per_iter
);
let backup_lag =
(last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0);
METRICS.backup_lag.set(backup_lag.into());

let tree_info = tree.reader().info().await;
health_updater.update(tree_info.into());

if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 {
// Check stop signal before proceeding with a potentially time-consuming operation.
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
return Ok(());
}
// It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after
// recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component
// responsible for their appearance!), but fortunately most of the updater doesn't depend on it.
if let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata {
let backup_lag =
(last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0);
METRICS.backup_lag.set(backup_lag.into());

if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 {
// Check stop signal before proceeding with a potentially time-consuming operation.
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
return Ok(());
}

tracing::warn!(
"Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \
({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \
Truncating Merkle tree versions so that this mismatch is fixed..."
);
tree.revert_logs(last_l1_batch_with_metadata);
tree.save().await;
next_l1_batch_to_seal = tree.next_l1_batch_number();
tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}");
tracing::warn!(
"Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \
({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \
Truncating Merkle tree versions so that this mismatch is fixed..."
);
tree.revert_logs(last_l1_batch_with_metadata);
tree.save().await;
next_l1_batch_to_seal = tree.next_l1_batch_number();
tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}");

let tree_info = tree.reader().info().await;
health_updater.update(tree_info.into());
let tree_info = tree.reader().info().await;
health_updater.update(tree_info.into());
}
}

loop {
if *stop_receiver.borrow_and_update() {
tracing::info!("Stop signal received, metadata_calculator is shutting down");
break;
}
let storage = pool
.access_storage_tagged("metadata_calculator")
.await
.unwrap();
let storage = pool.access_storage_tagged("metadata_calculator").await?;

let snapshot = *next_l1_batch_to_seal;
self.step(storage, &mut next_l1_batch_to_seal).await;
Expand Down
Loading