From d3f72608fe171dcc887c919dfd2e18d428bc8a82 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 11:29:42 +0200 Subject: [PATCH 01/13] Brush up reorg detector E.g., propagate errors instead of panicking --- core/bin/external_node/src/main.rs | 33 +++-- .../lib/zksync_core/src/reorg_detector/mod.rs | 126 +++++++++--------- 2 files changed, 85 insertions(+), 74 deletions(-) diff --git a/core/bin/external_node/src/main.rs b/core/bin/external_node/src/main.rs index 41799793eea4..295279166334 100644 --- a/core/bin/external_node/src/main.rs +++ b/core/bin/external_node/src/main.rs @@ -1,8 +1,8 @@ use std::{sync::Arc, time::Duration}; -use anyhow::Context; +use anyhow::Context as _; use clap::Parser; -use futures::{future::FusedFuture, FutureExt}; +use futures::{future::FusedFuture, FutureExt as _}; use metrics::EN_METRICS; use prometheus_exporter::PrometheusExporterConfig; use tokio::{sync::watch, task, time::sleep}; @@ -443,23 +443,20 @@ async fn main() -> anyhow::Result<()> { let reorg_detector = ReorgDetector::new(&main_node_url, connection_pool.clone(), stop_receiver); let mut reorg_detector_handle = tokio::spawn(reorg_detector.run()).fuse(); + let mut reorg_detector_result = None; let particular_crypto_alerts = None; let graceful_shutdown = None::>; let tasks_allowed_to_finish = false; - let mut reorg_detector_last_correct_batch = None; tokio::select! { _ = wait_for_tasks(task_handles, particular_crypto_alerts, graceful_shutdown, tasks_allowed_to_finish) => {}, _ = sigint_receiver => { tracing::info!("Stop signal received, shutting down"); }, - last_correct_batch = &mut reorg_detector_handle => { - if let Ok(last_correct_batch) = last_correct_batch { - reorg_detector_last_correct_batch = last_correct_batch; - } else { - tracing::error!("Reorg detector actor failed"); - } + result = &mut reorg_detector_handle => { + tracing::info!("Reorg detector terminated, shutting down"); + reorg_detector_result = Some(result); } }; @@ -468,13 +465,23 @@ async fn main() -> anyhow::Result<()> { shutdown_components(stop_sender, health_check_handle).await; if !reorg_detector_handle.is_terminated() { - if let Ok(Some(last_correct_batch)) = reorg_detector_handle.await { - reorg_detector_last_correct_batch = Some(last_correct_batch); - } + reorg_detector_result = Some(reorg_detector_handle.await); } + let reorg_detector_last_correct_batch = reorg_detector_result.and_then(|result| match result { + Ok(Ok(last_correct_batch)) => last_correct_batch, + Ok(Err(err)) => { + tracing::error!("Reorg detector failed: {err}"); + None + } + Err(err) => { + tracing::error!("Reorg detector panicked: {err}"); + None + } + }); if let Some(last_correct_batch) = reorg_detector_last_correct_batch { - tracing::info!("Performing rollback to block {}", last_correct_batch); + tracing::info!("Performing rollback to L1 batch #{last_correct_batch}"); + let reverter = BlockReverter::new( config.required.state_cache_path, config.required.merkle_tree_path, diff --git a/core/lib/zksync_core/src/reorg_detector/mod.rs b/core/lib/zksync_core/src/reorg_detector/mod.rs index cd399716c201..4a394b6d08d8 100644 --- a/core/lib/zksync_core/src/reorg_detector/mod.rs +++ b/core/lib/zksync_core/src/reorg_detector/mod.rs @@ -1,5 +1,6 @@ use std::{future::Future, time::Duration}; +use anyhow::Context as _; use tokio::sync::watch; use zksync_dal::ConnectionPool; use zksync_types::{L1BatchNumber, MiniblockNumber}; @@ -9,13 +10,30 @@ use zksync_web3_decl::{ http_client::{HttpClient, HttpClientBuilder}, }, namespaces::{EthNamespaceClient, ZksNamespaceClient}, - RpcResult, }; use crate::metrics::{CheckerComponent, EN_METRICS}; const SLEEP_INTERVAL: Duration = Duration::from_secs(5); +#[derive(Debug, thiserror::Error)] +enum HashMatchError { + #[error("RPC error calling main node")] + Rpc(#[from] RpcError), + #[error("Internal error")] + Internal(#[from] anyhow::Error), +} + +impl From for HashMatchError { + fn from(err: zksync_dal::SqlxError) -> Self { + Self::Internal(err.into()) + } +} + +fn is_transient_err(err: &RpcError) -> bool { + matches!(err, RpcError::Transport(_) | RpcError::RequestTimeout) +} + /// This is a component that is responsible for detecting the batch re-orgs. /// Batch re-org is a rare event of manual intervention, when the node operator /// decides to revert some of the not yet finalized batches for some reason @@ -50,29 +68,25 @@ impl ReorgDetector { } /// Compares hashes of the given local miniblock and the same miniblock from main node. - async fn miniblock_hashes_match(&self, miniblock_number: MiniblockNumber) -> RpcResult { - let local_hash = self - .pool - .access_storage() - .await - .unwrap() + async fn miniblock_hashes_match( + &self, + miniblock_number: MiniblockNumber, + ) -> Result { + let mut storage = self.pool.access_storage().await?; + let local_hash = storage .blocks_dal() .get_miniblock_header(miniblock_number) - .await - .unwrap() - .unwrap_or_else(|| { - panic!( - "Header does not exist for local miniblock #{}", - miniblock_number - ) - }) + .await? + .with_context(|| { + format!("Header does not exist for local miniblock #{miniblock_number}") + })? .hash; + drop(storage); - let Some(hash) = self + let Some(header) = self .client .get_block_by_number(miniblock_number.0.into(), false) .await? - .map(|header| header.hash) else { // Due to reorg, locally we may be ahead of the main node. // Lack of the hash on the main node is treated as a hash match, @@ -80,32 +94,29 @@ impl ReorgDetector { return Ok(true); }; - Ok(hash == local_hash) + Ok(header.hash == local_hash) } /// Compares root hashes of the latest local batch and of the same batch from the main node. - async fn root_hashes_match(&self, l1_batch_number: L1BatchNumber) -> RpcResult { - // Unwrapping is fine since the caller always checks that these root hashes exist. - let local_hash = self - .pool - .access_storage() - .await - .unwrap() + async fn root_hashes_match( + &self, + l1_batch_number: L1BatchNumber, + ) -> Result { + let mut storage = self.pool.access_storage().await?; + let local_hash = storage .blocks_dal() .get_l1_batch_state_root(l1_batch_number) - .await - .unwrap() - .unwrap_or_else(|| { - panic!( - "Root hash does not exist for local batch #{}", - l1_batch_number - ) - }); + .await? + .with_context(|| { + format!("Root hash does not exist for local batch #{l1_batch_number}") + })?; + drop(storage); + let Some(hash) = self .client .get_l1_batch_details(l1_batch_number) .await? - .and_then(|b| b.base.root_hash) + .and_then(|batch| batch.base.root_hash) else { // Due to reorg, locally we may be ahead of the main node. // Lack of the root hash on the main node is treated as a hash match, @@ -116,9 +127,12 @@ impl ReorgDetector { } /// Localizes a re-org: performs binary search to determine the last non-diverged block. - async fn detect_reorg(&self, diverged_l1_batch: L1BatchNumber) -> RpcResult { + async fn detect_reorg( + &self, + diverged_l1_batch: L1BatchNumber, + ) -> Result { // TODO (BFT-176, BFT-181): We have to look through the whole history, since batch status updater may mark - // a block as executed even if the state diverges for it. + // a block as executed even if the state diverges for it. binary_search_with(1, diverged_l1_batch.0, |number| { self.root_hashes_match(L1BatchNumber(number)) }) @@ -126,49 +140,37 @@ impl ReorgDetector { .map(L1BatchNumber) } - pub async fn run(mut self) -> Option { + pub async fn run(mut self) -> anyhow::Result> { loop { match self.run_inner().await { - Ok(l1_batch_number) => return l1_batch_number, - Err(err @ RpcError::Transport(_) | err @ RpcError::RequestTimeout) => { + Ok(l1_batch_number) => return Ok(l1_batch_number), + Err(HashMatchError::Rpc(err)) if is_transient_err(&err) => { tracing::warn!("Following transport error occurred: {err}"); tracing::info!("Trying again after a delay"); tokio::time::sleep(SLEEP_INTERVAL).await; } - Err(err) => { - panic!("Unexpected error in the reorg detector: {}", err); - } + Err(HashMatchError::Rpc(err)) => return Err(err.into()), + Err(HashMatchError::Internal(err)) => return Err(err), } } } - async fn run_inner(&mut self) -> RpcResult> { + async fn run_inner(&mut self) -> Result, HashMatchError> { loop { let should_stop = *self.should_stop.borrow(); - let sealed_l1_batch_number = self - .pool - .access_storage() - .await - .unwrap() + let mut storage = self.pool.access_storage().await?; + let sealed_l1_batch_number = storage .blocks_dal() .get_last_l1_batch_number_with_metadata() - .await - .unwrap(); - - let sealed_miniblock_number = self - .pool - .access_storage() - .await - .unwrap() - .blocks_dal() - .get_sealed_miniblock_number() - .await - .unwrap(); + .await?; + let sealed_miniblock_number = + storage.blocks_dal().get_sealed_miniblock_number().await?; + drop(storage); tracing::trace!( "Checking for reorgs - L1 batch #{sealed_l1_batch_number}, \ - miniblock number #{sealed_miniblock_number}" + miniblock number #{sealed_miniblock_number}" ); let root_hashes_match = self.root_hashes_match(sealed_l1_batch_number).await?; @@ -200,12 +202,14 @@ impl ReorgDetector { ); } tracing::info!("Searching for the first diverged batch"); + let last_correct_l1_batch = self.detect_reorg(sealed_l1_batch_number).await?; tracing::info!( "Reorg localized: last correct L1 batch is #{last_correct_l1_batch}" ); return Ok(Some(last_correct_l1_batch)); } + if should_stop { tracing::info!("Shutting down reorg detector"); return Ok(None); From 37ff9dd009c682f7d261e88739c372144f88bb87 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 12:44:49 +0200 Subject: [PATCH 02/13] Add basic tests for reorg detector --- .../lib/zksync_core/src/reorg_detector/mod.rs | 114 ++++++---- .../zksync_core/src/reorg_detector/tests.rs | 212 ++++++++++++++++++ 2 files changed, 284 insertions(+), 42 deletions(-) create mode 100644 core/lib/zksync_core/src/reorg_detector/tests.rs diff --git a/core/lib/zksync_core/src/reorg_detector/mod.rs b/core/lib/zksync_core/src/reorg_detector/mod.rs index 4a394b6d08d8..b9ed6af2ea98 100644 --- a/core/lib/zksync_core/src/reorg_detector/mod.rs +++ b/core/lib/zksync_core/src/reorg_detector/mod.rs @@ -1,9 +1,10 @@ -use std::{future::Future, time::Duration}; +use std::{fmt, future::Future, time::Duration}; use anyhow::Context as _; +use async_trait::async_trait; use tokio::sync::watch; use zksync_dal::ConnectionPool; -use zksync_types::{L1BatchNumber, MiniblockNumber}; +use zksync_types::{L1BatchNumber, MiniblockNumber, H256}; use zksync_web3_decl::{ jsonrpsee::{ core::Error as RpcError, @@ -14,7 +15,8 @@ use zksync_web3_decl::{ use crate::metrics::{CheckerComponent, EN_METRICS}; -const SLEEP_INTERVAL: Duration = Duration::from_secs(5); +#[cfg(test)] +mod tests; #[derive(Debug, thiserror::Error)] enum HashMatchError { @@ -34,6 +36,52 @@ fn is_transient_err(err: &RpcError) -> bool { matches!(err, RpcError::Transport(_) | RpcError::RequestTimeout) } +#[async_trait] +trait MainNodeClient: fmt::Debug + Send + Sync { + async fn miniblock_hash(&self, number: MiniblockNumber) -> Result, RpcError>; + + async fn l1_batch_root_hash(&self, number: L1BatchNumber) -> Result, RpcError>; +} + +#[async_trait] +impl MainNodeClient for HttpClient { + async fn miniblock_hash(&self, number: MiniblockNumber) -> Result, RpcError> { + Ok(self + .get_block_by_number(number.0.into(), false) + .await? + .map(|block| block.hash)) + } + + async fn l1_batch_root_hash(&self, number: L1BatchNumber) -> Result, RpcError> { + Ok(self + .get_l1_batch_details(number) + .await? + .and_then(|batch| batch.base.root_hash)) + } +} + +trait UpdateCorrectBlock: fmt::Debug + Send + Sync { + fn update_correct_block( + &mut self, + last_correct_miniblock: MiniblockNumber, + last_correct_l1_batch: L1BatchNumber, + ); +} + +/// Default implementation of [`UpdateCorrectBlock`] that reports values as metrics. +impl UpdateCorrectBlock for () { + fn update_correct_block( + &mut self, + last_correct_miniblock: MiniblockNumber, + last_correct_l1_batch: L1BatchNumber, + ) { + EN_METRICS.last_correct_batch[&CheckerComponent::ReorgDetector] + .set(last_correct_miniblock.0.into()); + EN_METRICS.last_correct_miniblock[&CheckerComponent::ReorgDetector] + .set(last_correct_l1_batch.0.into()); + } +} + /// This is a component that is responsible for detecting the batch re-orgs. /// Batch re-org is a rare event of manual intervention, when the node operator /// decides to revert some of the not yet finalized batches for some reason @@ -50,20 +98,26 @@ fn is_transient_err(err: &RpcError) -> bool { /// and is special-cased in the `zksync_external_node` crate. #[derive(Debug)] pub struct ReorgDetector { - client: HttpClient, + client: Box, + block_updater: Box, pool: ConnectionPool, - should_stop: watch::Receiver, + stop_receiver: watch::Receiver, + sleep_interval: Duration, } impl ReorgDetector { - pub fn new(url: &str, pool: ConnectionPool, should_stop: watch::Receiver) -> Self { + const DEFAULT_SLEEP_INTERVAL: Duration = Duration::from_secs(5); + + pub fn new(url: &str, pool: ConnectionPool, stop_receiver: watch::Receiver) -> Self { let client = HttpClientBuilder::default() .build(url) .expect("Failed to create HTTP client"); Self { - client, + client: Box::new(client), + block_updater: Box::new(()), pool, - should_stop, + stop_receiver, + sleep_interval: Self::DEFAULT_SLEEP_INTERVAL, } } @@ -83,18 +137,14 @@ impl ReorgDetector { .hash; drop(storage); - let Some(header) = self - .client - .get_block_by_number(miniblock_number.0.into(), false) - .await? - else { + let Some(remote_hash) = self.client.miniblock_hash(miniblock_number).await? else { // Due to reorg, locally we may be ahead of the main node. // Lack of the hash on the main node is treated as a hash match, // We need to wait for our knowledge of main node to catch up. return Ok(true); }; - Ok(header.hash == local_hash) + Ok(remote_hash == local_hash) } /// Compares root hashes of the latest local batch and of the same batch from the main node. @@ -112,18 +162,13 @@ impl ReorgDetector { })?; drop(storage); - let Some(hash) = self - .client - .get_l1_batch_details(l1_batch_number) - .await? - .and_then(|batch| batch.base.root_hash) - else { + let Some(remote_hash) = self.client.l1_batch_root_hash(l1_batch_number).await? else { // Due to reorg, locally we may be ahead of the main node. // Lack of the root hash on the main node is treated as a hash match, // We need to wait for our knowledge of main node to catch up. return Ok(true); }; - Ok(hash == local_hash) + Ok(remote_hash == local_hash) } /// Localizes a re-org: performs binary search to determine the last non-diverged block. @@ -147,7 +192,7 @@ impl ReorgDetector { Err(HashMatchError::Rpc(err)) if is_transient_err(&err) => { tracing::warn!("Following transport error occurred: {err}"); tracing::info!("Trying again after a delay"); - tokio::time::sleep(SLEEP_INTERVAL).await; + tokio::time::sleep(self.sleep_interval).await; } Err(HashMatchError::Rpc(err)) => return Err(err.into()), Err(HashMatchError::Internal(err)) => return Err(err), @@ -157,7 +202,7 @@ impl ReorgDetector { async fn run_inner(&mut self) -> Result, HashMatchError> { loop { - let should_stop = *self.should_stop.borrow(); + let should_stop = *self.stop_receiver.borrow(); let mut storage = self.pool.access_storage().await?; let sealed_l1_batch_number = storage @@ -181,13 +226,11 @@ impl ReorgDetector { // hash mismatch at the same block height is detected, be it miniblocks or batches. // // In other cases either there is only a height mismatch which means that one of - // the nodes needs to do catching up, howver it is not certain that there is actually + // the nodes needs to do catching up; however, it is not certain that there is actually // a reorg taking place. if root_hashes_match && miniblock_hashes_match { - EN_METRICS.last_correct_batch[&CheckerComponent::ReorgDetector] - .set(sealed_l1_batch_number.0.into()); - EN_METRICS.last_correct_miniblock[&CheckerComponent::ReorgDetector] - .set(sealed_miniblock_number.0.into()); + self.block_updater + .update_correct_block(sealed_miniblock_number, sealed_l1_batch_number); } else { if !root_hashes_match { tracing::warn!( @@ -214,7 +257,7 @@ impl ReorgDetector { tracing::info!("Shutting down reorg detector"); return Ok(None); } - tokio::time::sleep(SLEEP_INTERVAL).await; + tokio::time::sleep(self.sleep_interval).await; } } } @@ -234,16 +277,3 @@ where } Ok(left) } - -#[cfg(test)] -mod tests { - /// Tests the binary search algorithm. - #[tokio::test] - async fn test_binary_search() { - for divergence_point in [1, 50, 51, 100] { - let mut f = |x| async move { Ok::<_, ()>(x < divergence_point) }; - let result = super::binary_search_with(0, 100, &mut f).await; - assert_eq!(result, Ok(divergence_point - 1)); - } - } -} diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs new file mode 100644 index 000000000000..5513d1a73eb2 --- /dev/null +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -0,0 +1,212 @@ +//! Tests for the reorg detector component. + +use std::collections::HashMap; + +use tokio::sync::mpsc; +use zksync_contracts::BaseSystemContractsHashes; +use zksync_dal::StorageProcessor; +use zksync_types::{ + block::{BlockGasCount, L1BatchHeader, MiniblockHeader}, + Address, L2ChainId, ProtocolVersionId, +}; + +use super::*; +use crate::genesis::{ensure_genesis_state, GenesisParams}; + +async fn store_miniblock(storage: &mut StorageProcessor<'_>, number: u32, hash: H256) { + let header = MiniblockHeader { + number: MiniblockNumber(number), + timestamp: number.into(), + hash, + l1_tx_count: 0, + l2_tx_count: 0, + base_fee_per_gas: 0, + l1_gas_price: 0, + l2_fair_gas_price: 0, + base_system_contracts_hashes: BaseSystemContractsHashes::default(), + protocol_version: Some(ProtocolVersionId::latest()), + virtual_blocks: 1, + }; + storage + .blocks_dal() + .insert_miniblock(&header) + .await + .unwrap(); +} + +async fn seal_l1_batch(storage: &mut StorageProcessor<'_>, number: u32, hash: H256) { + let header = L1BatchHeader::new( + L1BatchNumber(number), + number.into(), + Address::default(), + BaseSystemContractsHashes::default(), + ProtocolVersionId::latest(), + ); + storage + .blocks_dal() + .insert_l1_batch(&header, &[], BlockGasCount::default(), &[], &[]) + .await + .unwrap(); + storage + .blocks_dal() + .mark_miniblocks_as_executed_in_l1_batch(L1BatchNumber(number)) + .await + .unwrap(); + storage + .blocks_dal() + .set_l1_batch_hash(L1BatchNumber(number), hash) + .await + .unwrap(); +} + +/// Tests the binary search algorithm. +#[tokio::test] +async fn test_binary_search() { + for divergence_point in [1, 50, 51, 100] { + let mut f = |x| async move { Ok::<_, ()>(x < divergence_point) }; + let result = binary_search_with(0, 100, &mut f).await; + assert_eq!(result, Ok(divergence_point - 1)); + } +} + +type ResponsesMap = HashMap; + +#[derive(Debug, Default)] +struct MockMaiNodeClient { + miniblock_responses: ResponsesMap, + l1_batch_responses: ResponsesMap, +} + +#[async_trait] +impl MainNodeClient for MockMaiNodeClient { + async fn miniblock_hash(&self, number: MiniblockNumber) -> Result, RpcError> { + dbg!(number); + if let Some(response) = self.miniblock_responses.get(&number) { + Ok(Some(*response)) + } else { + Ok(None) + } + } + + async fn l1_batch_root_hash(&self, number: L1BatchNumber) -> Result, RpcError> { + dbg!(number); + if let Some(response) = self.l1_batch_responses.get(&number) { + Ok(Some(*response)) + } else { + Ok(None) + } + } +} + +impl UpdateCorrectBlock for mpsc::UnboundedSender<(MiniblockNumber, L1BatchNumber)> { + fn update_correct_block( + &mut self, + last_correct_miniblock: MiniblockNumber, + last_correct_l1_batch: L1BatchNumber, + ) { + self.send((last_correct_miniblock, last_correct_l1_batch)) + .ok(); + } +} + +#[tokio::test] +async fn normal_reorg_function() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let (stop_sender, stop_receiver) = watch::channel(false); + let (block_update_sender, mut block_update_receiver) = + mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); + let mut client = MockMaiNodeClient::default(); + + let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { + let miniblock_hash = H256::from_low_u64_be(number.into()); + client + .miniblock_responses + .insert(MiniblockNumber(number), miniblock_hash); + let l1_batch_hash = H256::repeat_byte(number as u8); + client + .l1_batch_responses + .insert(L1BatchNumber(number), l1_batch_hash); + (number, miniblock_hash, l1_batch_hash) + }); + let miniblock_and_l1_batch_hashes: Vec<_> = miniblock_and_l1_batch_hashes.collect(); + + let detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(block_update_sender), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + let detector_task = tokio::spawn(detector.run()); + + for (number, miniblock_hash, l1_batch_hash) in miniblock_and_l1_batch_hashes { + store_miniblock(&mut storage, number, miniblock_hash).await; + tokio::time::sleep(Duration::from_millis(10)).await; + seal_l1_batch(&mut storage, number, l1_batch_hash).await; + tokio::time::sleep(Duration::from_millis(10)).await; + } + + while let Some((miniblock, l1_batch)) = block_update_receiver.recv().await { + assert!(miniblock <= MiniblockNumber(10)); + assert!(l1_batch <= L1BatchNumber(10)); + if miniblock == MiniblockNumber(10) && l1_batch == L1BatchNumber(10) { + break; + } + } + + // Check detector shutdown + stop_sender.send_replace(true); + let task_result = detector_task.await.unwrap(); + assert_eq!(task_result.unwrap(), None); +} + +#[tokio::test] +async fn detecting_reorg_by_batch_hash() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut client = MockMaiNodeClient::default(); + let miniblock_hash = H256::from_low_u64_be(23); + client + .miniblock_responses + .insert(MiniblockNumber(1), miniblock_hash); + client + .l1_batch_responses + .insert(L1BatchNumber(1), H256::repeat_byte(1)); + client + .miniblock_responses + .insert(MiniblockNumber(2), miniblock_hash); + client + .l1_batch_responses + .insert(L1BatchNumber(2), H256::repeat_byte(2)); + + let detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(()), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + let detector_task = tokio::spawn(detector.run()); + + store_miniblock(&mut storage, 1, miniblock_hash).await; + seal_l1_batch(&mut storage, 1, H256::repeat_byte(1)).await; + store_miniblock(&mut storage, 2, miniblock_hash).await; + seal_l1_batch(&mut storage, 2, H256::repeat_byte(0xff)).await; + // ^ Hash of L1 batch #2 differs from that on the main node. + + let task_result = detector_task.await.unwrap(); + let last_correct_l1_batch = task_result.unwrap(); + assert_eq!(last_correct_l1_batch, Some(L1BatchNumber(1))); +} + +// FIXME: test mismatch by miniblock hash, transient and non-transient errors From 11a6e957da7cde43247fdff69c37bd1c25979dcf Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 13:18:36 +0200 Subject: [PATCH 03/13] Add more tests for reorg detector --- .../zksync_core/src/reorg_detector/tests.rs | 143 +++++++++++++++++- 1 file changed, 136 insertions(+), 7 deletions(-) diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index 5513d1a73eb2..0defd51aa263 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; +use test_casing::{test_casing, Product}; use tokio::sync::mpsc; use zksync_contracts::BaseSystemContractsHashes; use zksync_dal::StorageProcessor; @@ -80,7 +81,6 @@ struct MockMaiNodeClient { #[async_trait] impl MainNodeClient for MockMaiNodeClient { async fn miniblock_hash(&self, number: MiniblockNumber) -> Result, RpcError> { - dbg!(number); if let Some(response) = self.miniblock_responses.get(&number) { Ok(Some(*response)) } else { @@ -89,7 +89,6 @@ impl MainNodeClient for MockMaiNodeClient { } async fn l1_batch_root_hash(&self, number: L1BatchNumber) -> Result, RpcError> { - dbg!(number); if let Some(response) = self.l1_batch_responses.get(&number) { Ok(Some(*response)) } else { @@ -117,11 +116,7 @@ async fn normal_reorg_function() { .await .unwrap(); - let (stop_sender, stop_receiver) = watch::channel(false); - let (block_update_sender, mut block_update_receiver) = - mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); let mut client = MockMaiNodeClient::default(); - let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { let miniblock_hash = H256::from_low_u64_be(number.into()); client @@ -135,6 +130,9 @@ async fn normal_reorg_function() { }); let miniblock_and_l1_batch_hashes: Vec<_> = miniblock_and_l1_batch_hashes.collect(); + let (stop_sender, stop_receiver) = watch::channel(false); + let (block_update_sender, mut block_update_receiver) = + mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); let detector = ReorgDetector { client: Box::new(client), block_updater: Box::new(block_update_sender), @@ -209,4 +207,135 @@ async fn detecting_reorg_by_batch_hash() { assert_eq!(last_correct_l1_batch, Some(L1BatchNumber(1))); } -// FIXME: test mismatch by miniblock hash, transient and non-transient errors +#[tokio::test] +async fn detecting_reorg_by_miniblock_hash() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut client = MockMaiNodeClient::default(); + let miniblock_hash = H256::from_low_u64_be(23); + client + .miniblock_responses + .insert(MiniblockNumber(1), miniblock_hash); + client + .l1_batch_responses + .insert(L1BatchNumber(1), H256::repeat_byte(1)); + client + .miniblock_responses + .insert(MiniblockNumber(2), miniblock_hash); + client + .miniblock_responses + .insert(MiniblockNumber(3), miniblock_hash); + + let detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(()), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + let detector_task = tokio::spawn(detector.run()); + + store_miniblock(&mut storage, 1, miniblock_hash).await; + seal_l1_batch(&mut storage, 1, H256::repeat_byte(1)).await; + store_miniblock(&mut storage, 2, miniblock_hash).await; + store_miniblock(&mut storage, 3, H256::repeat_byte(42)).await; + // ^ Hash of the miniblock #3 differs from that on the main node. + + let task_result = detector_task.await.unwrap(); + let last_correct_l1_batch = task_result.unwrap(); + assert_eq!(last_correct_l1_batch, Some(L1BatchNumber(1))); + // ^ All locally stored L1 batches should be correct. +} + +#[derive(Debug, Clone, Copy)] +enum StorageUpdateStrategy { + /// Prefill the local storage with all block data. + Prefill, + /// Sequentially add a new L1 batch after the previous one was checked. + Sequential, +} + +impl StorageUpdateStrategy { + const ALL: [Self; 2] = [Self::Prefill, Self::Sequential]; +} + +#[test_casing(8, Product(([2, 3, 5, 8], StorageUpdateStrategy::ALL)))] +#[tokio::test] +async fn detecting_deep_reorg( + last_correct_batch: u32, + storage_update_strategy: StorageUpdateStrategy, +) { + assert!(last_correct_batch < 10); + + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let mut client = MockMaiNodeClient::default(); + let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { + let mut miniblock_hash = H256::from_low_u64_be(number.into()); + client + .miniblock_responses + .insert(MiniblockNumber(number), miniblock_hash); + let mut l1_batch_hash = H256::repeat_byte(number as u8); + client + .l1_batch_responses + .insert(L1BatchNumber(number), l1_batch_hash); + + if number > last_correct_batch { + miniblock_hash = H256::zero(); + l1_batch_hash = H256::zero(); + } + (number, miniblock_hash, l1_batch_hash) + }); + let mut miniblock_and_l1_batch_hashes: Vec<_> = miniblock_and_l1_batch_hashes.collect(); + + if matches!(storage_update_strategy, StorageUpdateStrategy::Prefill) { + for &(number, miniblock_hash, l1_batch_hash) in &miniblock_and_l1_batch_hashes { + store_miniblock(&mut storage, number, miniblock_hash).await; + seal_l1_batch(&mut storage, number, l1_batch_hash).await; + } + } + + let (_stop_sender, stop_receiver) = watch::channel(false); + let (block_update_sender, mut block_update_receiver) = + mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); + let detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(block_update_sender), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + let detector_task = tokio::spawn(detector.run()); + + if matches!(storage_update_strategy, StorageUpdateStrategy::Sequential) { + let mut last_number = 0; + while let Some((miniblock, l1_batch)) = block_update_receiver.recv().await { + if miniblock == MiniblockNumber(last_number) && l1_batch == L1BatchNumber(last_number) { + let (number, miniblock_hash, l1_batch_hash) = + miniblock_and_l1_batch_hashes.remove(0); + assert_eq!(number, last_number + 1); + store_miniblock(&mut storage, number, miniblock_hash).await; + seal_l1_batch(&mut storage, number, l1_batch_hash).await; + last_number = number; + } + } + } + + let task_result = detector_task.await.unwrap(); + let last_correct_l1_batch = task_result.unwrap(); + assert_eq!( + last_correct_l1_batch, + Some(L1BatchNumber(last_correct_batch)) + ); +} + +// FIXME: test transient and non-transient errors From 6f823a6e1e20be2d0fe8431c4128237a2f39e010 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 15:51:10 +0200 Subject: [PATCH 04/13] Add error tests for reorg detector --- .../zksync_core/src/reorg_detector/tests.rs | 78 ++++++++++++++++--- 1 file changed, 68 insertions(+), 10 deletions(-) diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index 0defd51aa263..607b0c4c8c7e 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -1,6 +1,9 @@ //! Tests for the reorg detector component. -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use test_casing::{test_casing, Product}; use tokio::sync::mpsc; @@ -72,15 +75,35 @@ async fn test_binary_search() { type ResponsesMap = HashMap; +#[derive(Debug, Clone, Copy)] +enum RpcErrorKind { + Transient, + Fatal, +} + +impl From for RpcError { + fn from(kind: RpcErrorKind) -> Self { + match kind { + RpcErrorKind::Transient => Self::RequestTimeout, + RpcErrorKind::Fatal => Self::HttpNotImplemented, + } + } +} + #[derive(Debug, Default)] -struct MockMaiNodeClient { +struct MockMainNodeClient { miniblock_responses: ResponsesMap, l1_batch_responses: ResponsesMap, + error_kind: Arc>>, } #[async_trait] -impl MainNodeClient for MockMaiNodeClient { +impl MainNodeClient for MockMainNodeClient { async fn miniblock_hash(&self, number: MiniblockNumber) -> Result, RpcError> { + if let &Some(error_kind) = &*self.error_kind.lock().unwrap() { + return Err(error_kind.into()); + } + if let Some(response) = self.miniblock_responses.get(&number) { Ok(Some(*response)) } else { @@ -89,6 +112,10 @@ impl MainNodeClient for MockMaiNodeClient { } async fn l1_batch_root_hash(&self, number: L1BatchNumber) -> Result, RpcError> { + if let &Some(error_kind) = &*self.error_kind.lock().unwrap() { + return Err(error_kind.into()); + } + if let Some(response) = self.l1_batch_responses.get(&number) { Ok(Some(*response)) } else { @@ -108,15 +135,16 @@ impl UpdateCorrectBlock for mpsc::UnboundedSender<(MiniblockNumber, L1BatchNumbe } } +#[test_casing(2, [false, true])] #[tokio::test] -async fn normal_reorg_function() { +async fn normal_reorg_function(with_transient_errors: bool) { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) .await .unwrap(); - let mut client = MockMaiNodeClient::default(); + let mut client = MockMainNodeClient::default(); let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { let miniblock_hash = H256::from_low_u64_be(number.into()); client @@ -130,6 +158,16 @@ async fn normal_reorg_function() { }); let miniblock_and_l1_batch_hashes: Vec<_> = miniblock_and_l1_batch_hashes.collect(); + if with_transient_errors { + *client.error_kind.lock().unwrap() = Some(RpcErrorKind::Transient); + // "Fix" the client after a certain delay. + let error_kind = Arc::clone(&client.error_kind); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(100)).await; + *error_kind.lock().unwrap() = None; + }); + } + let (stop_sender, stop_receiver) = watch::channel(false); let (block_update_sender, mut block_update_receiver) = mpsc::unbounded_channel::<(MiniblockNumber, L1BatchNumber)>(); @@ -163,6 +201,28 @@ async fn normal_reorg_function() { assert_eq!(task_result.unwrap(), None); } +#[tokio::test] +async fn handling_fatal_rpc_error() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + + let client = MockMainNodeClient::default(); + *client.error_kind.lock().unwrap() = Some(RpcErrorKind::Fatal); + + let (_stop_sender, stop_receiver) = watch::channel(false); + let detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(()), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + detector.run().await.unwrap_err(); +} + #[tokio::test] async fn detecting_reorg_by_batch_hash() { let pool = ConnectionPool::test_pool().await; @@ -172,7 +232,7 @@ async fn detecting_reorg_by_batch_hash() { .unwrap(); let (_stop_sender, stop_receiver) = watch::channel(false); - let mut client = MockMaiNodeClient::default(); + let mut client = MockMainNodeClient::default(); let miniblock_hash = H256::from_low_u64_be(23); client .miniblock_responses @@ -216,7 +276,7 @@ async fn detecting_reorg_by_miniblock_hash() { .unwrap(); let (_stop_sender, stop_receiver) = watch::channel(false); - let mut client = MockMaiNodeClient::default(); + let mut client = MockMainNodeClient::default(); let miniblock_hash = H256::from_low_u64_be(23); client .miniblock_responses @@ -278,7 +338,7 @@ async fn detecting_deep_reorg( .await .unwrap(); - let mut client = MockMaiNodeClient::default(); + let mut client = MockMainNodeClient::default(); let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { let mut miniblock_hash = H256::from_low_u64_be(number.into()); client @@ -337,5 +397,3 @@ async fn detecting_deep_reorg( Some(L1BatchNumber(last_correct_batch)) ); } - -// FIXME: test transient and non-transient errors From 642f346f4e7b11d237cf182a82d4ecf63c73d954 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 17:33:31 +0200 Subject: [PATCH 05/13] Add DB query for earliest L1 batch w/ metadata --- core/lib/dal/sqlx-data.json | 20 +++++++++++++++++++- core/lib/dal/src/blocks_dal.rs | 32 ++++++++++++++++++++++++++------ 2 files changed, 45 insertions(+), 7 deletions(-) diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index 1ca26edfdeaa..c45742c1b1cc 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -12028,6 +12028,24 @@ }, "query": "\n SELECT\n number,\n l1_tx_count,\n l2_tx_count,\n timestamp,\n is_finished,\n fee_account_address,\n l2_to_l1_logs,\n l2_to_l1_messages,\n bloom,\n priority_ops_onchain_data,\n used_contract_hashes,\n base_fee_per_gas,\n l1_gas_price,\n l2_fair_gas_price,\n bootloader_code_hash,\n default_aa_code_hash,\n protocol_version,\n system_logs,\n compressed_state_diffs\n FROM\n l1_batches\n WHERE\n eth_commit_tx_id = $1\n OR eth_prove_tx_id = $1\n OR eth_execute_tx_id = $1\n " }, + "f91790ae5cc4b087bf942ba52dd63a1e89945f8d5e0f4da42ecf6313c4f5967e": { + "describe": { + "columns": [ + { + "name": "number", + "ordinal": 0, + "type_info": "Int8" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [] + } + }, + "query": "\n SELECT\n MIN(number) AS \"number\"\n FROM\n l1_batches\n WHERE\n hash IS NOT NULL\n " + }, "f922c0718c9dda2f285f09cbabad425bac8ed3d2780c60c9b63afbcea131f9a0": { "describe": { "columns": [], @@ -12129,4 +12147,4 @@ }, "query": "\n SELECT\n l2_to_l1_logs\n FROM\n l1_batches\n WHERE\n number = $1\n " } -} +} \ No newline at end of file diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 555856c1bba3..8b007211ed2d 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -84,8 +84,8 @@ impl BlocksDal<'_, '_> { pub async fn get_last_l1_batch_number_with_metadata( &mut self, - ) -> anyhow::Result { - let number: i64 = sqlx::query!( + ) -> sqlx::Result> { + let row = sqlx::query!( r#" SELECT MAX(number) AS "number" @@ -98,10 +98,30 @@ impl BlocksDal<'_, '_> { .instrument("get_last_block_number_with_metadata") .report_latency() .fetch_one(self.storage.conn()) - .await? - .number - .context("DAL invocation before genesis")?; - Ok(L1BatchNumber(number as u32)) + .await?; + + Ok(row.number.map(|num| L1BatchNumber(num as u32))) + } + + pub async fn get_earliest_l1_batch_number_with_metadata( + &mut self, + ) -> sqlx::Result> { + let row = sqlx::query!( + r#" + SELECT + MIN(number) AS "number" + FROM + l1_batches + WHERE + hash IS NOT NULL + "# + ) + .instrument("get_earliest_l1_batch_number_with_metadata") + .report_latency() + .fetch_one(self.storage.conn()) + .await?; + + Ok(row.number.map(|num| L1BatchNumber(num as u32))) } pub async fn get_l1_batches_for_eth_tx_id( From 0f8ae485f60d2b642303fabcb90bd0f66a175538 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 17:34:11 +0200 Subject: [PATCH 06/13] Sketch snapshot recovery for reorg detector --- .../src/house_keeper/blocks_state_reporter.rs | 32 ++++----- core/lib/zksync_core/src/lib.rs | 1 + .../src/metadata_calculator/updater.rs | 23 +++--- .../lib/zksync_core/src/reorg_detector/mod.rs | 70 ++++++++++++++----- .../zksync_core/src/reorg_detector/tests.rs | 24 +++++++ core/lib/zksync_core/src/utils.rs | 35 ++++++++++ 6 files changed, 137 insertions(+), 48 deletions(-) create mode 100644 core/lib/zksync_core/src/utils.rs diff --git a/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs b/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs index 91c4a80b047c..2ed1f97fee71 100644 --- a/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs +++ b/core/lib/zksync_core/src/house_keeper/blocks_state_reporter.rs @@ -21,22 +21,22 @@ impl L1BatchMetricsReporter { async fn report_metrics(&self) { let mut conn = self.connection_pool.access_storage().await.unwrap(); - let mut block_metrics = vec![ - ( - conn.blocks_dal() - .get_sealed_l1_batch_number() - .await - .unwrap(), - BlockStage::Sealed, - ), - ( - conn.blocks_dal() - .get_last_l1_batch_number_with_metadata() - .await - .unwrap(), - BlockStage::MetadataCalculated, - ), - ]; + let mut block_metrics = vec![( + conn.blocks_dal() + .get_sealed_l1_batch_number() + .await + .unwrap(), + BlockStage::Sealed, + )]; + + let last_l1_batch_with_metadata = conn + .blocks_dal() + .get_last_l1_batch_number_with_metadata() + .await + .unwrap(); + if let Some(number) = last_l1_batch_with_metadata { + block_metrics.push((number, BlockStage::MetadataCalculated)); + } let eth_stats = conn.eth_sender_dal().get_eth_l1_batches().await.unwrap(); for (tx_type, l1_batch) in eth_stats.saved { diff --git a/core/lib/zksync_core/src/lib.rs b/core/lib/zksync_core/src/lib.rs index 42cdc043e73e..745b47dcd858 100644 --- a/core/lib/zksync_core/src/lib.rs +++ b/core/lib/zksync_core/src/lib.rs @@ -93,6 +93,7 @@ pub mod reorg_detector; pub mod state_keeper; pub mod sync_layer; pub mod temp_config_store; +mod utils; /// Inserts the initial information about zkSync tokens into the database. pub async fn genesis_init( diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 3b4ab8c50f5b..ed223098c02a 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -267,10 +267,8 @@ impl TreeUpdater { mut stop_receiver: watch::Receiver, health_updater: HealthUpdater, ) -> anyhow::Result<()> { - let mut storage = pool - .access_storage_tagged("metadata_calculator") - .await - .unwrap(); + // FIXME: wait until there's an L1 batch in storage? + let mut storage = pool.access_storage_tagged("metadata_calculator").await?; // Ensure genesis creation let tree = &mut self.tree; @@ -283,16 +281,14 @@ impl TreeUpdater { } let mut next_l1_batch_to_seal = tree.next_l1_batch_number(); - let current_db_batch = storage - .blocks_dal() - .get_sealed_l1_batch_number() - .await - .unwrap(); + let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; let last_l1_batch_with_metadata = storage .blocks_dal() .get_last_l1_batch_number_with_metadata() - .await - .unwrap(); + .await?; + let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata else { + todo!() + }; drop(storage); tracing::info!( @@ -334,10 +330,7 @@ impl TreeUpdater { tracing::info!("Stop signal received, metadata_calculator is shutting down"); break; } - let storage = pool - .access_storage_tagged("metadata_calculator") - .await - .unwrap(); + let storage = pool.access_storage_tagged("metadata_calculator").await?; let snapshot = *next_l1_batch_to_seal; self.step(storage, &mut next_l1_batch_to_seal).await; diff --git a/core/lib/zksync_core/src/reorg_detector/mod.rs b/core/lib/zksync_core/src/reorg_detector/mod.rs index b9ed6af2ea98..e59aa8a25388 100644 --- a/core/lib/zksync_core/src/reorg_detector/mod.rs +++ b/core/lib/zksync_core/src/reorg_detector/mod.rs @@ -13,7 +13,10 @@ use zksync_web3_decl::{ namespaces::{EthNamespaceClient, ZksNamespaceClient}, }; -use crate::metrics::{CheckerComponent, EN_METRICS}; +use crate::{ + metrics::{CheckerComponent, EN_METRICS}, + utils::wait_for_l1_batch_with_metadata, +}; #[cfg(test)] mod tests; @@ -144,6 +147,12 @@ impl ReorgDetector { return Ok(true); }; + if remote_hash != local_hash { + tracing::warn!( + "Reorg detected: local hash {local_hash:?} doesn't match the hash from \ + main node {remote_hash:?} (miniblock #{miniblock_number})" + ); + } Ok(remote_hash == local_hash) } @@ -168,17 +177,25 @@ impl ReorgDetector { // We need to wait for our knowledge of main node to catch up. return Ok(true); }; + + if remote_hash != local_hash { + tracing::warn!( + "Reorg detected: local root hash {local_hash:?} doesn't match the state hash from \ + main node {remote_hash:?} (L1 batch #{l1_batch_number})" + ); + } Ok(remote_hash == local_hash) } /// Localizes a re-org: performs binary search to determine the last non-diverged block. async fn detect_reorg( &self, + known_valid_l1_batch: L1BatchNumber, diverged_l1_batch: L1BatchNumber, ) -> Result { // TODO (BFT-176, BFT-181): We have to look through the whole history, since batch status updater may mark // a block as executed even if the state diverges for it. - binary_search_with(1, diverged_l1_batch.0, |number| { + binary_search_with(known_valid_l1_batch.0, diverged_l1_batch.0, |number| { self.root_hashes_match(L1BatchNumber(number)) }) .await @@ -201,14 +218,36 @@ impl ReorgDetector { } async fn run_inner(&mut self) -> Result, HashMatchError> { + let earliest_l1_batch_number = + wait_for_l1_batch_with_metadata(&self.pool, self.sleep_interval, &self.stop_receiver) + .await?; + + let Some(earliest_l1_batch_number) = earliest_l1_batch_number else { + return Ok(None); // stop signal received + }; + tracing::debug!( + "Checking root hash match for earliest L1 batch #{earliest_l1_batch_number}" + ); + if !self.root_hashes_match(earliest_l1_batch_number).await? { + let err = anyhow::anyhow!( + "Unrecoverable error: the earliest L1 batch #{earliest_l1_batch_number} in the local DB \ + has mismatched hash with the main node. Make sure you're connected to the right network; \ + if you've recovered from a snapshot, re-check snapshot authenticity. \ + Using an earlier snapshot could help." + ); + return Err(err.into()); + } + loop { let should_stop = *self.stop_receiver.borrow(); + // At this point, we are guaranteed to have L1 batches and miniblocks in the storage. let mut storage = self.pool.access_storage().await?; let sealed_l1_batch_number = storage .blocks_dal() .get_last_l1_batch_number_with_metadata() - .await?; + .await? + .context("L1 batches table unexpectedly emptied")?; let sealed_miniblock_number = storage.blocks_dal().get_sealed_miniblock_number().await?; drop(storage); @@ -232,21 +271,16 @@ impl ReorgDetector { self.block_updater .update_correct_block(sealed_miniblock_number, sealed_l1_batch_number); } else { - if !root_hashes_match { - tracing::warn!( - "Reorg detected: last state hash doesn't match the state hash from \ - main node (L1 batch #{sealed_l1_batch_number})" - ); - } - if !miniblock_hashes_match { - tracing::warn!( - "Reorg detected: last state hash doesn't match the state hash from \ - main node (MiniblockNumber #{sealed_miniblock_number})" - ); - } - tracing::info!("Searching for the first diverged batch"); + let diverged_l1_batch_number = if root_hashes_match { + sealed_l1_batch_number + 1 // Non-sealed L1 batch has diverged + } else { + sealed_l1_batch_number + }; - let last_correct_l1_batch = self.detect_reorg(sealed_l1_batch_number).await?; + tracing::info!("Searching for the first diverged L1 batch"); + let last_correct_l1_batch = self + .detect_reorg(earliest_l1_batch_number, diverged_l1_batch_number) + .await?; tracing::info!( "Reorg localized: last correct L1 batch is #{last_correct_l1_batch}" ); @@ -269,6 +303,8 @@ where { while left + 1 < right { let middle = (left + right) / 2; + assert!(middle < right); // middle <= (right - 2 + right) / 2 = right - 1 + if f(middle).await? { left = middle; } else { diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index 607b0c4c8c7e..cc1703fe913e 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -397,3 +397,27 @@ async fn detecting_deep_reorg( Some(L1BatchNumber(last_correct_batch)) ); } + +#[tokio::test] +async fn stopping_reorg_detector_while_waiting_for_l1_batch() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + assert!(storage.blocks_dal().is_genesis_needed().await.unwrap()); + drop(storage); + + let (stop_sender, stop_receiver) = watch::channel(false); + let detector = ReorgDetector { + client: Box::::default(), + block_updater: Box::new(()), + pool, + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + let detector_task = tokio::spawn(detector.run()); + + stop_sender.send_replace(true); + + let task_result = detector_task.await.unwrap(); + let last_correct_l1_batch = task_result.unwrap(); + assert_eq!(last_correct_l1_batch, None); +} diff --git a/core/lib/zksync_core/src/utils.rs b/core/lib/zksync_core/src/utils.rs new file mode 100644 index 000000000000..5a35b45e4cdb --- /dev/null +++ b/core/lib/zksync_core/src/utils.rs @@ -0,0 +1,35 @@ +//! Miscellaneous utils used by multiple components. + +use std::time::Duration; + +use tokio::sync::watch; +use zksync_dal::ConnectionPool; +use zksync_types::L1BatchNumber; + +/// Repeatedly polls DB until there is an L1 batch with metadata. We may not have such a batch +/// if the DB is recovered from an (application-level) snapshot. +/// +/// Returns the number of rhe *earliest* L1 batch with metadata, or `None` if the stop signal is received. +pub(crate) async fn wait_for_l1_batch_with_metadata( + pool: &ConnectionPool, + poll_interval: Duration, + stop_receiver: &watch::Receiver, +) -> anyhow::Result> { + loop { + if *stop_receiver.borrow() { + return Ok(None); + } + + let mut storage = pool.access_storage().await?; + let sealed_l1_batch_number = storage + .blocks_dal() + .get_earliest_l1_batch_number_with_metadata() + .await?; + drop(storage); + + if let Some(number) = sealed_l1_batch_number { + return Ok(Some(number)); + } + tokio::time::sleep(poll_interval).await; + } +} From 545bfcce0f92d8778c06bc63ceebe9ae85a9a1a6 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 19 Dec 2023 18:24:26 +0200 Subject: [PATCH 07/13] Test snapshot recovery in reorg detector --- .../lib/zksync_core/src/reorg_detector/mod.rs | 19 +-- .../zksync_core/src/reorg_detector/tests.rs | 117 +++++++++++++++--- 2 files changed, 112 insertions(+), 24 deletions(-) diff --git a/core/lib/zksync_core/src/reorg_detector/mod.rs b/core/lib/zksync_core/src/reorg_detector/mod.rs index e59aa8a25388..73348583dbbc 100644 --- a/core/lib/zksync_core/src/reorg_detector/mod.rs +++ b/core/lib/zksync_core/src/reorg_detector/mod.rs @@ -25,6 +25,13 @@ mod tests; enum HashMatchError { #[error("RPC error calling main node")] Rpc(#[from] RpcError), + #[error( + "Unrecoverable error: the earliest L1 batch #{0} in the local DB \ + has mismatched hash with the main node. Make sure you're connected to the right network; \ + if you've recovered from a snapshot, re-check snapshot authenticity. \ + Using an earlier snapshot could help." + )] + EarliestHashMismatch(L1BatchNumber), #[error("Internal error")] Internal(#[from] anyhow::Error), } @@ -211,8 +218,8 @@ impl ReorgDetector { tracing::info!("Trying again after a delay"); tokio::time::sleep(self.sleep_interval).await; } - Err(HashMatchError::Rpc(err)) => return Err(err.into()), Err(HashMatchError::Internal(err)) => return Err(err), + Err(err) => return Err(err.into()), } } } @@ -229,13 +236,9 @@ impl ReorgDetector { "Checking root hash match for earliest L1 batch #{earliest_l1_batch_number}" ); if !self.root_hashes_match(earliest_l1_batch_number).await? { - let err = anyhow::anyhow!( - "Unrecoverable error: the earliest L1 batch #{earliest_l1_batch_number} in the local DB \ - has mismatched hash with the main node. Make sure you're connected to the right network; \ - if you've recovered from a snapshot, re-check snapshot authenticity. \ - Using an earlier snapshot could help." - ); - return Err(err.into()); + return Err(HashMatchError::EarliestHashMismatch( + earliest_l1_batch_number, + )); } loop { diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index cc1703fe913e..6b9888881f87 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -5,13 +5,14 @@ use std::{ sync::{Arc, Mutex}, }; +use assert_matches::assert_matches; use test_casing::{test_casing, Product}; use tokio::sync::mpsc; use zksync_contracts::BaseSystemContractsHashes; use zksync_dal::StorageProcessor; use zksync_types::{ block::{BlockGasCount, L1BatchHeader, MiniblockHeader}, - Address, L2ChainId, ProtocolVersionId, + Address, L2ChainId, ProtocolVersion, ProtocolVersionId, }; use super::*; @@ -135,17 +136,31 @@ impl UpdateCorrectBlock for mpsc::UnboundedSender<(MiniblockNumber, L1BatchNumbe } } -#[test_casing(2, [false, true])] +#[test_casing(4, Product(([false, true], [false, true])))] #[tokio::test] -async fn normal_reorg_function(with_transient_errors: bool) { +async fn normal_reorg_function(snapshot_recovery: bool, with_transient_errors: bool) { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); - ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) - .await - .unwrap(); + if snapshot_recovery { + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(ProtocolVersion::default()) + .await; + } else { + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + } let mut client = MockMainNodeClient::default(); - let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { + let l1_batch_numbers = if snapshot_recovery { + 11_u32..=20 + } else { + 1_u32..=10 + }; + let last_l1_batch_number = L1BatchNumber(*l1_batch_numbers.end()); + let last_miniblock_number = MiniblockNumber(*l1_batch_numbers.end()); + let miniblock_and_l1_batch_hashes = l1_batch_numbers.map(|number| { let miniblock_hash = H256::from_low_u64_be(number.into()); client .miniblock_responses @@ -188,9 +203,9 @@ async fn normal_reorg_function(with_transient_errors: bool) { } while let Some((miniblock, l1_batch)) = block_update_receiver.recv().await { - assert!(miniblock <= MiniblockNumber(10)); - assert!(l1_batch <= L1BatchNumber(10)); - if miniblock == MiniblockNumber(10) && l1_batch == L1BatchNumber(10) { + assert!(miniblock <= last_miniblock_number); + assert!(l1_batch <= last_l1_batch_number); + if miniblock == last_miniblock_number && l1_batch == last_l1_batch_number { break; } } @@ -324,22 +339,32 @@ impl StorageUpdateStrategy { const ALL: [Self; 2] = [Self::Prefill, Self::Sequential]; } -#[test_casing(8, Product(([2, 3, 5, 8], StorageUpdateStrategy::ALL)))] +#[test_casing(16, Product(([false, true], [2, 3, 5, 8], StorageUpdateStrategy::ALL)))] #[tokio::test] async fn detecting_deep_reorg( + snapshot_recovery: bool, last_correct_batch: u32, storage_update_strategy: StorageUpdateStrategy, ) { assert!(last_correct_batch < 10); + let (l1_batch_numbers, last_correct_batch) = if snapshot_recovery { + (11_u32..=20, last_correct_batch + 10) + } else { + (1_u32..=10, last_correct_batch) + }; let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); - ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) - .await - .unwrap(); + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(ProtocolVersion::default()) + .await; + let earliest_l1_batch_number = l1_batch_numbers.start() - 1; + store_miniblock(&mut storage, earliest_l1_batch_number, H256::zero()).await; + seal_l1_batch(&mut storage, earliest_l1_batch_number, H256::zero()).await; let mut client = MockMainNodeClient::default(); - let miniblock_and_l1_batch_hashes = (1_u32..=10).map(|number| { + let miniblock_and_l1_batch_hashes = l1_batch_numbers.clone().map(|number| { let mut miniblock_hash = H256::from_low_u64_be(number.into()); client .miniblock_responses @@ -377,7 +402,7 @@ async fn detecting_deep_reorg( let detector_task = tokio::spawn(detector.run()); if matches!(storage_update_strategy, StorageUpdateStrategy::Sequential) { - let mut last_number = 0; + let mut last_number = earliest_l1_batch_number; while let Some((miniblock, l1_batch)) = block_update_receiver.recv().await { if miniblock == MiniblockNumber(last_number) && l1_batch == L1BatchNumber(last_number) { let (number, miniblock_hash, l1_batch_hash) = @@ -421,3 +446,63 @@ async fn stopping_reorg_detector_while_waiting_for_l1_batch() { let last_correct_l1_batch = task_result.unwrap(); assert_eq!(last_correct_l1_batch, None); } + +#[tokio::test] +async fn earliest_batch_hash_mismatch() { + let pool = ConnectionPool::test_pool().await; + let mut storage = pool.access_storage().await.unwrap(); + let genesis_root_hash = + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + assert_ne!(genesis_root_hash, H256::zero()); + + let mut client = MockMainNodeClient::default(); + client + .l1_batch_responses + .insert(L1BatchNumber(0), H256::zero()); + + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(()), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + + let err = detector.run_inner().await.unwrap_err(); + assert_matches!(err, HashMatchError::EarliestHashMismatch(L1BatchNumber(0))); +} + +#[tokio::test] +async fn earliest_batch_hash_mismatch_with_snapshot_recovery() { + let pool = ConnectionPool::test_pool().await; + let mut client = MockMainNodeClient::default(); + client + .l1_batch_responses + .insert(L1BatchNumber(3), H256::zero()); + + let (_stop_sender, stop_receiver) = watch::channel(false); + let mut detector = ReorgDetector { + client: Box::new(client), + block_updater: Box::new(()), + pool: pool.clone(), + stop_receiver, + sleep_interval: Duration::from_millis(10), + }; + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(20)).await; + let mut storage = pool.access_storage().await.unwrap(); + storage + .protocol_versions_dal() + .save_protocol_version_with_tx(ProtocolVersion::default()) + .await; + store_miniblock(&mut storage, 3, H256::from_low_u64_be(3)).await; + seal_l1_batch(&mut storage, 3, H256::from_low_u64_be(3)).await; + }); + + let err = detector.run_inner().await.unwrap_err(); + assert_matches!(err, HashMatchError::EarliestHashMismatch(L1BatchNumber(3))); +} From febecdf8e09b47bc36a8103c355a68ce895ab2ab Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 20 Dec 2023 09:29:15 +0200 Subject: [PATCH 08/13] Fix `last_l1_batch_with_metadata` handling --- .../src/metadata_calculator/updater.rs | 52 ++++++++++--------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index ed223098c02a..7af73a13d610 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -286,43 +286,45 @@ impl TreeUpdater { .blocks_dal() .get_last_l1_batch_number_with_metadata() .await?; - let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata else { - todo!() - }; drop(storage); tracing::info!( "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch}, \ - last L1 batch with metadata: {last_l1_batch_with_metadata}", + last L1 batch with metadata: {last_l1_batch_with_metadata:?}", max_batches_per_iter = self.max_l1_batches_per_iter ); - let backup_lag = - (last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0); - METRICS.backup_lag.set(backup_lag.into()); - let tree_info = tree.reader().info().await; health_updater.update(tree_info.into()); - if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 { - // Check stop signal before proceeding with a potentially time-consuming operation. - if *stop_receiver.borrow_and_update() { - tracing::info!("Stop signal received, metadata_calculator is shutting down"); - return Ok(()); - } + // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after + // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component + // responsible for their appearance!), but fortunately most of the updater doesn't depend on it. + if let Some(last_l1_batch_with_metadata) = last_l1_batch_with_metadata { + let backup_lag = + (last_l1_batch_with_metadata.0 + 1).saturating_sub(next_l1_batch_to_seal.0); + METRICS.backup_lag.set(backup_lag.into()); + + if next_l1_batch_to_seal > last_l1_batch_with_metadata + 1 { + // Check stop signal before proceeding with a potentially time-consuming operation. + if *stop_receiver.borrow_and_update() { + tracing::info!("Stop signal received, metadata_calculator is shutting down"); + return Ok(()); + } - tracing::warn!( - "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ - ({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \ - Truncating Merkle tree versions so that this mismatch is fixed..." - ); - tree.revert_logs(last_l1_batch_with_metadata); - tree.save().await; - next_l1_batch_to_seal = tree.next_l1_batch_number(); - tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); + tracing::warn!( + "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ + ({last_l1_batch_with_metadata}); this may be a result of restoring Postgres from a snapshot. \ + Truncating Merkle tree versions so that this mismatch is fixed..." + ); + tree.revert_logs(last_l1_batch_with_metadata); + tree.save().await; + next_l1_batch_to_seal = tree.next_l1_batch_number(); + tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); - let tree_info = tree.reader().info().await; - health_updater.update(tree_info.into()); + let tree_info = tree.reader().info().await; + health_updater.update(tree_info.into()); + } } loop { From 050ee9d0e416a39fc40d3f97b81adfbc605bd3bc Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 20 Dec 2023 10:06:21 +0200 Subject: [PATCH 09/13] Add DB query for earliest L1 batch number --- core/lib/dal/sqlx-data.json | 18 ++++++++++++++++++ core/lib/dal/src/blocks_dal.rs | 20 ++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index c45742c1b1cc..36a846666720 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -3966,6 +3966,24 @@ }, "query": "\n SELECT\n eth_txs_history.id,\n eth_txs_history.eth_tx_id,\n eth_txs_history.tx_hash,\n eth_txs_history.base_fee_per_gas,\n eth_txs_history.priority_fee_per_gas,\n eth_txs_history.signed_raw_tx,\n eth_txs.nonce\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NULL\n AND eth_txs.confirmed_eth_tx_history_id IS NULL\n ORDER BY\n eth_txs_history.id DESC\n " }, + "43c7e352d09f69de1a182196aea4de79b67833f17d252b5b0e8e00cd6e75b5c1": { + "describe": { + "columns": [ + { + "name": "number", + "ordinal": 0, + "type_info": "Int8" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [] + } + }, + "query": "\n SELECT\n MIN(number) AS \"number\"\n FROM\n l1_batches\n " + }, "45b5825c82d33c9494ceef0fdc77675b89128d56559b8c89465844a914f5245e": { "describe": { "columns": [ diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 8b007211ed2d..322219deaace 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -82,6 +82,24 @@ impl BlocksDal<'_, '_> { Ok(MiniblockNumber(number as u32)) } + /// Returns the number of the earliest L1 batch present in the DB, or `None` if there are no L1 batches. + pub async fn get_earliest_l1_batch_number(&mut self) -> sqlx::Result> { + let row = sqlx::query!( + r#" + SELECT + MIN(number) AS "number" + FROM + l1_batches + "# + ) + .instrument("get_earliest_l1_batch_number") + .report_latency() + .fetch_one(self.storage.conn()) + .await?; + + Ok(row.number.map(|num| L1BatchNumber(num as u32))) + } + pub async fn get_last_l1_batch_number_with_metadata( &mut self, ) -> sqlx::Result> { @@ -103,6 +121,8 @@ impl BlocksDal<'_, '_> { Ok(row.number.map(|num| L1BatchNumber(num as u32))) } + /// Returns the number of the earliest L1 batch with metadata (= state hash) present in the DB, + /// or `None` if there are no such L1 batches. pub async fn get_earliest_l1_batch_number_with_metadata( &mut self, ) -> sqlx::Result> { From 25f9c232fd04713cfd1ff48e4fb2ddf1703e7f78 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 20 Dec 2023 10:07:26 +0200 Subject: [PATCH 10/13] Rework L1 batch waiting utils --- .../src/metadata_calculator/helpers.rs | 4 + .../src/metadata_calculator/updater.rs | 14 +++- .../lib/zksync_core/src/reorg_detector/mod.rs | 11 ++- core/lib/zksync_core/src/utils.rs | 83 ++++++++++++++++++- 4 files changed, 102 insertions(+), 10 deletions(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index 5c3e672312e2..e0471cb238e9 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -333,6 +333,10 @@ impl Delayer { } } + pub fn delay_interval(&self) -> Duration { + self.delay_interval + } + #[cfg_attr(not(test), allow(unused))] // `tree` is only used in test mode pub fn wait(&self, tree: &AsyncTree) -> impl Future { #[cfg(test)] diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 7af73a13d610..575ba99bb284 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -18,6 +18,7 @@ use super::{ metrics::{TreeUpdateStage, METRICS}, MetadataCalculator, }; +use crate::utils::wait_for_l1_batch; #[derive(Debug)] pub(super) struct TreeUpdater { @@ -267,13 +268,22 @@ impl TreeUpdater { mut stop_receiver: watch::Receiver, health_updater: HealthUpdater, ) -> anyhow::Result<()> { - // FIXME: wait until there's an L1 batch in storage? + let Some(earliest_l1_batch) = + wait_for_l1_batch(pool, delayer.delay_interval(), &mut stop_receiver).await? + else { + return Ok(()); // Stop signal received + }; let mut storage = pool.access_storage_tagged("metadata_calculator").await?; // Ensure genesis creation let tree = &mut self.tree; if tree.is_empty() { - let logs = L1BatchWithLogs::new(&mut storage, L1BatchNumber(0)) + assert_eq!( + earliest_l1_batch, + L1BatchNumber(0), + "Non-zero earliest L1 batch is not supported without previous tree recovery" + ); + let logs = L1BatchWithLogs::new(&mut storage, earliest_l1_batch) .await .context("Missing storage logs for the genesis L1 batch")?; tree.process_l1_batch(logs.storage_logs).await; diff --git a/core/lib/zksync_core/src/reorg_detector/mod.rs b/core/lib/zksync_core/src/reorg_detector/mod.rs index 73348583dbbc..78094cc1fb87 100644 --- a/core/lib/zksync_core/src/reorg_detector/mod.rs +++ b/core/lib/zksync_core/src/reorg_detector/mod.rs @@ -225,12 +225,15 @@ impl ReorgDetector { } async fn run_inner(&mut self) -> Result, HashMatchError> { - let earliest_l1_batch_number = - wait_for_l1_batch_with_metadata(&self.pool, self.sleep_interval, &self.stop_receiver) - .await?; + let earliest_l1_batch_number = wait_for_l1_batch_with_metadata( + &self.pool, + self.sleep_interval, + &mut self.stop_receiver, + ) + .await?; let Some(earliest_l1_batch_number) = earliest_l1_batch_number else { - return Ok(None); // stop signal received + return Ok(None); // Stop signal received }; tracing::debug!( "Checking root hash match for earliest L1 batch #{earliest_l1_batch_number}" diff --git a/core/lib/zksync_core/src/utils.rs b/core/lib/zksync_core/src/utils.rs index 5a35b45e4cdb..902f284a0f70 100644 --- a/core/lib/zksync_core/src/utils.rs +++ b/core/lib/zksync_core/src/utils.rs @@ -6,14 +6,43 @@ use tokio::sync::watch; use zksync_dal::ConnectionPool; use zksync_types::L1BatchNumber; -/// Repeatedly polls DB until there is an L1 batch with metadata. We may not have such a batch -/// if the DB is recovered from an (application-level) snapshot. +/// Repeatedly polls the DB until there is an L1 batch. We may not have such a batch initially +/// if the DB is recovered from an application-level snapshot. +/// +/// Returns the number of rhe *earliest* L1 batch, or `None` if the stop signal is received. +pub(crate) async fn wait_for_l1_batch( + pool: &ConnectionPool, + poll_interval: Duration, + stop_receiver: &mut watch::Receiver, +) -> anyhow::Result> { + loop { + if *stop_receiver.borrow() { + return Ok(None); + } + + let mut storage = pool.access_storage().await?; + let sealed_l1_batch_number = storage.blocks_dal().get_earliest_l1_batch_number().await?; + drop(storage); + + if let Some(number) = sealed_l1_batch_number { + return Ok(Some(number)); + } + // We don't check the result: if a stop signal is received, we'll return at the start + // of the next iteration. + tokio::time::timeout(poll_interval, stop_receiver.changed()) + .await + .ok(); + } +} + +/// Repeatedly polls the DB until there is an L1 batch with metadata. We may not have such a batch initially +/// if the DB is recovered from an application-level snapshot. /// /// Returns the number of rhe *earliest* L1 batch with metadata, or `None` if the stop signal is received. pub(crate) async fn wait_for_l1_batch_with_metadata( pool: &ConnectionPool, poll_interval: Duration, - stop_receiver: &watch::Receiver, + stop_receiver: &mut watch::Receiver, ) -> anyhow::Result> { loop { if *stop_receiver.borrow() { @@ -30,6 +59,52 @@ pub(crate) async fn wait_for_l1_batch_with_metadata( if let Some(number) = sealed_l1_batch_number { return Ok(Some(number)); } - tokio::time::sleep(poll_interval).await; + tokio::time::timeout(poll_interval, stop_receiver.changed()) + .await + .ok(); + } +} + +#[cfg(test)] +mod tests { + use zksync_types::L2ChainId; + + use super::*; + use crate::genesis::{ensure_genesis_state, GenesisParams}; + + #[tokio::test] + async fn waiting_for_l1_batch_success() { + let pool = ConnectionPool::test_pool().await; + let (_stop_sender, mut stop_receiver) = watch::channel(false); + + let pool_copy = pool.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(25)).await; + let mut storage = pool_copy.access_storage().await.unwrap(); + ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) + .await + .unwrap(); + }); + + let l1_batch = wait_for_l1_batch(&pool, Duration::from_millis(10), &mut stop_receiver) + .await + .unwrap(); + assert_eq!(l1_batch, Some(L1BatchNumber(0))); + } + + #[tokio::test] + async fn waiting_for_l1_batch_cancellation() { + let pool = ConnectionPool::test_pool().await; + let (stop_sender, mut stop_receiver) = watch::channel(false); + + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(25)).await; + stop_sender.send_replace(true); + }); + + let l1_batch = wait_for_l1_batch(&pool, Duration::from_secs(30), &mut stop_receiver) + .await + .unwrap(); + assert_eq!(l1_batch, None); } } From 8598bbef2527dc262ef6b397a3e17991675bd31f Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 20 Dec 2023 10:19:35 +0200 Subject: [PATCH 11/13] Reference Linear issue in TODOs --- core/lib/zksync_core/src/metadata_calculator/recovery.rs | 2 +- core/lib/zksync_core/src/metadata_calculator/tests.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery.rs b/core/lib/zksync_core/src/metadata_calculator/recovery.rs index af494ab15d3e..52c254a1dbee 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery.rs @@ -422,7 +422,7 @@ impl AsyncTreeRecovery { } async fn snapshot_l1_batch(_pool: &ConnectionPool) -> anyhow::Result> { - Ok(None) // FIXME: implement real logic + Ok(None) // FIXME (PLA-708): implement real logic } #[cfg(test)] diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 8208c42090fa..68f70097af52 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -1,3 +1,7 @@ +//! Tests for the metadata calculator component lifecycle. + +// TODO (PLA-708): test full recovery lifecycle + use std::{future::Future, ops, panic, path::Path, time::Duration}; use assert_matches::assert_matches; From 6a48bea229484094ca3a42de0667c64cd81a7d17 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 20 Dec 2023 10:24:58 +0200 Subject: [PATCH 12/13] Fix spelling --- core/lib/zksync_core/src/metadata_calculator/tests.rs | 4 ++-- core/lib/zksync_core/src/utils.rs | 4 ++-- spellcheck/era.dic | 2 ++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index 68f70097af52..9aad3b3b93e6 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -1,6 +1,6 @@ -//! Tests for the metadata calculator component lifecycle. +//! Tests for the metadata calculator component life cycle. -// TODO (PLA-708): test full recovery lifecycle +// TODO (PLA-708): test full recovery life cycle use std::{future::Future, ops, panic, path::Path, time::Duration}; diff --git a/core/lib/zksync_core/src/utils.rs b/core/lib/zksync_core/src/utils.rs index 902f284a0f70..3acbc2dfb5bd 100644 --- a/core/lib/zksync_core/src/utils.rs +++ b/core/lib/zksync_core/src/utils.rs @@ -9,7 +9,7 @@ use zksync_types::L1BatchNumber; /// Repeatedly polls the DB until there is an L1 batch. We may not have such a batch initially /// if the DB is recovered from an application-level snapshot. /// -/// Returns the number of rhe *earliest* L1 batch, or `None` if the stop signal is received. +/// Returns the number of the *earliest* L1 batch, or `None` if the stop signal is received. pub(crate) async fn wait_for_l1_batch( pool: &ConnectionPool, poll_interval: Duration, @@ -38,7 +38,7 @@ pub(crate) async fn wait_for_l1_batch( /// Repeatedly polls the DB until there is an L1 batch with metadata. We may not have such a batch initially /// if the DB is recovered from an application-level snapshot. /// -/// Returns the number of rhe *earliest* L1 batch with metadata, or `None` if the stop signal is received. +/// Returns the number of the *earliest* L1 batch with metadata, or `None` if the stop signal is received. pub(crate) async fn wait_for_l1_batch_with_metadata( pool: &ConnectionPool, poll_interval: Duration, diff --git a/spellcheck/era.dic b/spellcheck/era.dic index e56162fcf02b..a054a5930270 100644 --- a/spellcheck/era.dic +++ b/spellcheck/era.dic @@ -157,6 +157,8 @@ namespaces StateDiffRecord BYTES_PER_ENUMERATION_INDEX derived_key +prefill +reorg // zkSync-related words matterlabs From 82aaac4adc2bbebc042c8b85714ba677c7943dc2 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 20 Dec 2023 17:18:53 +0200 Subject: [PATCH 13/13] Fix various nits --- .../zksync_core/src/reorg_detector/tests.rs | 51 ++++++++++--------- core/lib/zksync_core/src/utils.rs | 5 ++ 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index 6b9888881f87..6f09c0e9fee7 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -66,7 +66,7 @@ async fn seal_l1_batch(storage: &mut StorageProcessor<'_>, number: u32, hash: H2 /// Tests the binary search algorithm. #[tokio::test] -async fn test_binary_search() { +async fn binary_search_with_simple_predicate() { for divergence_point in [1, 50, 51, 100] { let mut f = |x| async move { Ok::<_, ()>(x < divergence_point) }; let result = binary_search_with(0, 100, &mut f).await; @@ -93,8 +93,8 @@ impl From for RpcError { #[derive(Debug, Default)] struct MockMainNodeClient { - miniblock_responses: ResponsesMap, - l1_batch_responses: ResponsesMap, + miniblock_hash_responses: ResponsesMap, + l1_batch_root_hash_responses: ResponsesMap, error_kind: Arc>>, } @@ -105,7 +105,7 @@ impl MainNodeClient for MockMainNodeClient { return Err(error_kind.into()); } - if let Some(response) = self.miniblock_responses.get(&number) { + if let Some(response) = self.miniblock_hash_responses.get(&number) { Ok(Some(*response)) } else { Ok(None) @@ -117,7 +117,7 @@ impl MainNodeClient for MockMainNodeClient { return Err(error_kind.into()); } - if let Some(response) = self.l1_batch_responses.get(&number) { + if let Some(response) = self.l1_batch_root_hash_responses.get(&number) { Ok(Some(*response)) } else { Ok(None) @@ -163,11 +163,11 @@ async fn normal_reorg_function(snapshot_recovery: bool, with_transient_errors: b let miniblock_and_l1_batch_hashes = l1_batch_numbers.map(|number| { let miniblock_hash = H256::from_low_u64_be(number.into()); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(number), miniblock_hash); let l1_batch_hash = H256::repeat_byte(number as u8); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(number), l1_batch_hash); (number, miniblock_hash, l1_batch_hash) }); @@ -217,7 +217,7 @@ async fn normal_reorg_function(snapshot_recovery: bool, with_transient_errors: b } #[tokio::test] -async fn handling_fatal_rpc_error() { +async fn detector_stops_on_fatal_rpc_error() { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) @@ -235,11 +235,12 @@ async fn handling_fatal_rpc_error() { stop_receiver, sleep_interval: Duration::from_millis(10), }; + // Check that the detector stops when a fatal RPC error is encountered. detector.run().await.unwrap_err(); } #[tokio::test] -async fn detecting_reorg_by_batch_hash() { +async fn reorg_is_detected_on_batch_hash_mismatch() { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) @@ -250,16 +251,16 @@ async fn detecting_reorg_by_batch_hash() { let mut client = MockMainNodeClient::default(); let miniblock_hash = H256::from_low_u64_be(23); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(1), miniblock_hash); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(1), H256::repeat_byte(1)); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(2), miniblock_hash); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(2), H256::repeat_byte(2)); let detector = ReorgDetector { @@ -283,7 +284,7 @@ async fn detecting_reorg_by_batch_hash() { } #[tokio::test] -async fn detecting_reorg_by_miniblock_hash() { +async fn reorg_is_detected_on_miniblock_hash_mismatch() { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); ensure_genesis_state(&mut storage, L2ChainId::default(), &GenesisParams::mock()) @@ -294,16 +295,16 @@ async fn detecting_reorg_by_miniblock_hash() { let mut client = MockMainNodeClient::default(); let miniblock_hash = H256::from_low_u64_be(23); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(1), miniblock_hash); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(1), H256::repeat_byte(1)); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(2), miniblock_hash); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(3), miniblock_hash); let detector = ReorgDetector { @@ -341,7 +342,7 @@ impl StorageUpdateStrategy { #[test_casing(16, Product(([false, true], [2, 3, 5, 8], StorageUpdateStrategy::ALL)))] #[tokio::test] -async fn detecting_deep_reorg( +async fn reorg_is_detected_on_historic_batch_hash_mismatch( snapshot_recovery: bool, last_correct_batch: u32, storage_update_strategy: StorageUpdateStrategy, @@ -367,11 +368,11 @@ async fn detecting_deep_reorg( let miniblock_and_l1_batch_hashes = l1_batch_numbers.clone().map(|number| { let mut miniblock_hash = H256::from_low_u64_be(number.into()); client - .miniblock_responses + .miniblock_hash_responses .insert(MiniblockNumber(number), miniblock_hash); let mut l1_batch_hash = H256::repeat_byte(number as u8); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(number), l1_batch_hash); if number > last_correct_batch { @@ -448,7 +449,7 @@ async fn stopping_reorg_detector_while_waiting_for_l1_batch() { } #[tokio::test] -async fn earliest_batch_hash_mismatch() { +async fn detector_errors_on_earliest_batch_hash_mismatch() { let pool = ConnectionPool::test_pool().await; let mut storage = pool.access_storage().await.unwrap(); let genesis_root_hash = @@ -459,7 +460,7 @@ async fn earliest_batch_hash_mismatch() { let mut client = MockMainNodeClient::default(); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(0), H256::zero()); let (_stop_sender, stop_receiver) = watch::channel(false); @@ -476,11 +477,11 @@ async fn earliest_batch_hash_mismatch() { } #[tokio::test] -async fn earliest_batch_hash_mismatch_with_snapshot_recovery() { +async fn detector_errors_on_earliest_batch_hash_mismatch_with_snapshot_recovery() { let pool = ConnectionPool::test_pool().await; let mut client = MockMainNodeClient::default(); client - .l1_batch_responses + .l1_batch_root_hash_responses .insert(L1BatchNumber(3), H256::zero()); let (_stop_sender, stop_receiver) = watch::channel(false); diff --git a/core/lib/zksync_core/src/utils.rs b/core/lib/zksync_core/src/utils.rs index 3acbc2dfb5bd..351b22350a38 100644 --- a/core/lib/zksync_core/src/utils.rs +++ b/core/lib/zksync_core/src/utils.rs @@ -27,6 +27,8 @@ pub(crate) async fn wait_for_l1_batch( if let Some(number) = sealed_l1_batch_number { return Ok(Some(number)); } + tracing::debug!("No L1 batches are present in DB; trying again in {poll_interval:?}"); + // We don't check the result: if a stop signal is received, we'll return at the start // of the next iteration. tokio::time::timeout(poll_interval, stop_receiver.changed()) @@ -59,6 +61,9 @@ pub(crate) async fn wait_for_l1_batch_with_metadata( if let Some(number) = sealed_l1_batch_number { return Ok(Some(number)); } + tracing::debug!( + "No L1 batches with metadata are present in DB; trying again in {poll_interval:?}" + ); tokio::time::timeout(poll_interval, stop_receiver.changed()) .await .ok();