From 20da5668a42a11cc0ea07f9d1a5d5c39e32ce3b4 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Wed, 12 Jun 2024 09:52:03 +0300 Subject: [PATCH] fix(en): Fix reorg detection in presence of tree data fetcher (#2197) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Fixes reorg detection logic so that it accounts for the tree data fetcher: - **In tree data fetcher:** Tries to detect reorgs, so that root hashes are not written for diverging L1 batches. - **In reorg detector:** Checks last L2 block correspondence during binary searching a diverging L1 batch. ## Why ❔ Reorg detection may be broken if tree data fetcher is enabled: - The tree data fetcher doesn't check that fetched L1 batch root hashes correspond to local L1 batches, i.e. it can fetch a root hash after a revert. - Hence, the logic in reorg detector which binary-searches the diverged L1 batch is broken because the latest L1 batch isn't guaranteed to diverge if there's a divergence. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. - [x] Spellcheck has been run via `zk spellcheck`. --- .../src/tree_data_fetcher/metrics.rs | 2 + .../node_sync/src/tree_data_fetcher/mod.rs | 60 +++- .../src/tree_data_fetcher/provider/mod.rs | 79 +++-- .../src/tree_data_fetcher/provider/tests.rs | 296 +++++++++++++----- .../node_sync/src/tree_data_fetcher/tests.rs | 21 +- core/node/reorg_detector/src/lib.rs | 73 +++-- core/node/reorg_detector/src/tests.rs | 45 +++ 7 files changed, 434 insertions(+), 142 deletions(-) diff --git a/core/node/node_sync/src/tree_data_fetcher/metrics.rs b/core/node/node_sync/src/tree_data_fetcher/metrics.rs index f0fb342b69b1..37c81cd2d40a 100644 --- a/core/node/node_sync/src/tree_data_fetcher/metrics.rs +++ b/core/node/node_sync/src/tree_data_fetcher/metrics.rs @@ -40,6 +40,7 @@ pub(super) enum StepOutcomeLabel { UpdatedBatch, NoProgress, RemoteHashMissing, + PossibleReorg, TransientError, } @@ -91,6 +92,7 @@ impl TreeDataFetcherMetrics { } Ok(StepOutcome::NoProgress) => StepOutcomeLabel::NoProgress, Ok(StepOutcome::RemoteHashMissing) => StepOutcomeLabel::RemoteHashMissing, + Ok(StepOutcome::PossibleReorg) => StepOutcomeLabel::PossibleReorg, Err(err) if err.is_transient() => StepOutcomeLabel::TransientError, Err(_) => return, // fatal error; the node will exit soon anyway }; diff --git a/core/node/node_sync/src/tree_data_fetcher/mod.rs b/core/node/node_sync/src/tree_data_fetcher/mod.rs index 912952a8d144..d155e03b5563 100644 --- a/core/node/node_sync/src/tree_data_fetcher/mod.rs +++ b/core/node/node_sync/src/tree_data_fetcher/mod.rs @@ -7,9 +7,12 @@ use serde::Serialize; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::watch; -use zksync_dal::{ConnectionPool, Core, CoreDal, DalError}; +use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; -use zksync_types::{block::L1BatchTreeData, Address, L1BatchNumber}; +use zksync_types::{ + block::{L1BatchTreeData, L2BlockHeader}, + Address, L1BatchNumber, +}; use zksync_web3_decl::{ client::{DynClient, L1, L2}, error::EnrichedClientError, @@ -77,6 +80,7 @@ enum StepOutcome { UpdatedBatch(L1BatchNumber), NoProgress, RemoteHashMissing, + PossibleReorg, } /// Component fetching tree data (i.e., state root hashes for L1 batches) from external sources, such as @@ -133,7 +137,6 @@ impl TreeDataFetcher { ); let l1_provider = L1DataProvider::new( - self.pool.clone(), eth_client.for_component("tree_data_fetcher"), diamond_proxy_address, )?; @@ -147,7 +150,7 @@ impl TreeDataFetcher { self.health_updater.subscribe() } - async fn get_batch_to_fetch(&self) -> anyhow::Result> { + async fn get_batch_to_fetch(&self) -> anyhow::Result> { let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?; // Fetch data in a readonly transaction to have a consistent view of the storage let mut storage = storage.start_transaction().await?; @@ -172,20 +175,41 @@ impl TreeDataFetcher { earliest_l1_batch }; Ok(if l1_batch_to_fetch <= last_l1_batch { - Some(l1_batch_to_fetch) + let last_l2_block = Self::get_last_l2_block(&mut storage, l1_batch_to_fetch).await?; + Some((l1_batch_to_fetch, last_l2_block)) } else { None }) } + async fn get_last_l2_block( + storage: &mut Connection<'_, Core>, + number: L1BatchNumber, + ) -> anyhow::Result { + let (_, last_l2_block_number) = storage + .blocks_dal() + .get_l2_block_range_of_l1_batch(number) + .await? + .with_context(|| format!("L1 batch #{number} disappeared from Postgres"))?; + storage + .blocks_dal() + .get_l2_block_header(last_l2_block_number) + .await? + .with_context(|| format!("L2 block #{last_l2_block_number} (last for L1 batch #{number}) disappeared from Postgres")) + } + async fn step(&mut self) -> Result { - let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else { + let Some((l1_batch_to_fetch, last_l2_block_header)) = self.get_batch_to_fetch().await? + else { return Ok(StepOutcome::NoProgress); }; - tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch} from main node"); + tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch}"); let stage_latency = self.metrics.stage_latency[&ProcessingStage::Fetch].start(); - let root_hash_result = self.data_provider.batch_details(l1_batch_to_fetch).await?; + let root_hash_result = self + .data_provider + .batch_details(l1_batch_to_fetch, &last_l2_block_header) + .await?; stage_latency.observe(); let root_hash = match root_hash_result { Ok(output) => { @@ -199,17 +223,23 @@ impl TreeDataFetcher { } Err(MissingData::Batch) => { let err = anyhow::anyhow!( - "L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present on the main node, \ + "L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present externally, \ which is assumed to store batch info indefinitely" ); return Err(err.into()); } Err(MissingData::RootHash) => { tracing::debug!( - "L1 batch #{l1_batch_to_fetch} does not have root hash computed on the main node" + "L1 batch #{l1_batch_to_fetch} does not have root hash computed externally" ); return Ok(StepOutcome::RemoteHashMissing); } + Err(MissingData::PossibleReorg) => { + tracing::debug!( + "L1 batch #{l1_batch_to_fetch} potentially diverges from the external source" + ); + return Ok(StepOutcome::PossibleReorg); + } }; let stage_latency = self.metrics.stage_latency[&ProcessingStage::Persistence].start(); @@ -266,6 +296,16 @@ impl TreeDataFetcher { self.update_health(last_updated_l1_batch); true } + Ok(StepOutcome::PossibleReorg) => { + tracing::info!("Potential chain reorg detected by tree data fetcher; not updating tree data"); + // Since we don't trust the reorg logic in the tree data fetcher, we let it continue working + // so that, if there's a false positive, the whole node doesn't crash (or is in a crash loop in the worst-case scenario). + let health = TreeDataFetcherHealth::Affected { + error: "Potential chain reorg".to_string(), + }; + self.health_updater.update(health.into()); + true + } Err(err) if err.is_transient() => { tracing::warn!( "Transient error in tree data fetcher, will retry after a delay: {err:?}" diff --git a/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs b/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs index 27cd040677d6..0c9362369fe6 100644 --- a/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs +++ b/core/node/node_sync/src/tree_data_fetcher/provider/mod.rs @@ -3,9 +3,8 @@ use std::fmt; use anyhow::Context; use async_trait::async_trait; use vise::{EncodeLabelSet, EncodeLabelValue}; -use zksync_dal::{ConnectionPool, Core, CoreDal}; use zksync_eth_client::EthInterface; -use zksync_types::{web3, Address, L1BatchNumber, H256, U256, U64}; +use zksync_types::{block::L2BlockHeader, web3, Address, L1BatchNumber, H256, U256, U64}; use zksync_web3_decl::{ client::{DynClient, L1, L2}, error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult}, @@ -26,6 +25,8 @@ pub(super) enum MissingData { /// The provider lacks a root hash for a requested L1 batch; the batch itself is present on the provider. #[error("no root hash for L1 batch")] RootHash, + #[error("possible chain reorg detected")] + PossibleReorg, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] @@ -48,14 +49,23 @@ pub(super) type TreeDataProviderResult = #[async_trait] pub(super) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static { /// Fetches a state root hash for the L1 batch with the specified number. + /// The method receives a header of the last L2 block in the batch, which can be used to check L1 batch consistency etc. /// /// It is guaranteed that this method will be called with monotonically increasing `number`s (although not necessarily sequential ones). - async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult; + async fn batch_details( + &mut self, + number: L1BatchNumber, + last_l2_block: &L2BlockHeader, + ) -> TreeDataProviderResult; } #[async_trait] impl TreeDataProvider for Box> { - async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult { + async fn batch_details( + &mut self, + number: L1BatchNumber, + last_l2_block: &L2BlockHeader, + ) -> TreeDataProviderResult { let Some(batch_details) = self .get_l1_batch_details(number) .rpc_context("get_l1_batch_details") @@ -64,6 +74,24 @@ impl TreeDataProvider for Box> { else { return Ok(Err(MissingData::Batch)); }; + + // Check the local data correspondence. + let remote_l2_block_hash = self + .get_block_details(last_l2_block.number) + .rpc_context("get_block_details") + .with_arg("number", &last_l2_block.number) + .await? + .and_then(|block| block.base.root_hash); + if remote_l2_block_hash != Some(last_l2_block.hash) { + let last_l2_block_number = last_l2_block.number; + let last_l2_block_hash = last_l2_block.hash; + tracing::info!( + "Fetched hash of the last L2 block #{last_l2_block_number} in L1 batch #{number} ({remote_l2_block_hash:?}) \ + does not match the local one ({last_l2_block_hash:?}); this can be caused by a chain reorg" + ); + return Ok(Err(MissingData::PossibleReorg)); + } + Ok(batch_details .base .root_hash @@ -94,7 +122,6 @@ struct PastL1BatchInfo { /// (provided it's not too far behind the seal timestamp of the batch). #[derive(Debug)] pub(super) struct L1DataProvider { - pool: ConnectionPool, eth_client: Box>, diamond_proxy_address: Address, block_commit_signature: H256, @@ -109,7 +136,6 @@ impl L1DataProvider { const L1_BLOCK_RANGE: U64 = U64([20_000]); pub fn new( - pool: ConnectionPool, eth_client: Box>, diamond_proxy_address: Address, ) -> anyhow::Result { @@ -118,7 +144,6 @@ impl L1DataProvider { .context("missing `BlockCommit` event")? .signature(); Ok(Self { - pool, eth_client, diamond_proxy_address, block_commit_signature, @@ -126,21 +151,6 @@ impl L1DataProvider { }) } - async fn l1_batch_seal_timestamp(&self, number: L1BatchNumber) -> anyhow::Result { - let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?; - let (_, last_l2_block_number) = storage - .blocks_dal() - .get_l2_block_range_of_l1_batch(number) - .await? - .with_context(|| format!("L1 batch #{number} does not have L2 blocks"))?; - let block_header = storage - .blocks_dal() - .get_l2_block_header(last_l2_block_number) - .await? - .with_context(|| format!("L2 block #{last_l2_block_number} (last block in L1 batch #{number}) disappeared"))?; - Ok(block_header.timestamp) - } - /// Guesses the number of an L1 block with a `BlockCommit` event for the specified L1 batch. /// The guess is based on the L1 batch seal timestamp. async fn guess_l1_commit_block_number( @@ -206,8 +216,12 @@ impl L1DataProvider { #[async_trait] impl TreeDataProvider for L1DataProvider { - async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult { - let l1_batch_seal_timestamp = self.l1_batch_seal_timestamp(number).await?; + async fn batch_details( + &mut self, + number: L1BatchNumber, + last_l2_block: &L2BlockHeader, + ) -> TreeDataProviderResult { + let l1_batch_seal_timestamp = last_l2_block.timestamp; let from_block = self.past_l1_batch.and_then(|info| { assert!( info.number < number, @@ -297,8 +311,11 @@ impl TreeDataProvider for L1DataProvider { })) } _ => { - tracing::warn!("Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}: {logs:?}"); - Ok(Err(MissingData::RootHash)) + tracing::warn!( + "Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}, potentially as a result \ + of a chain reorg: {logs:?}" + ); + Ok(Err(MissingData::PossibleReorg)) } } } @@ -313,9 +330,13 @@ pub(super) struct CombinedDataProvider { #[async_trait] impl TreeDataProvider for CombinedDataProvider { - async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult { + async fn batch_details( + &mut self, + number: L1BatchNumber, + last_l2_block: &L2BlockHeader, + ) -> TreeDataProviderResult { if let Some(l1) = &mut self.l1 { - match l1.batch_details(number).await { + match l1.batch_details(number, last_l2_block).await { Err(err) => { if err.is_transient() { tracing::info!( @@ -342,6 +363,6 @@ impl TreeDataProvider for CombinedDataProvider { } } } - self.fallback.batch_details(number).await + self.fallback.batch_details(number, last_l2_block).await } } diff --git a/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs b/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs index 90b912b8816a..bb252e09caad 100644 --- a/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs +++ b/core/node/node_sync/src/tree_data_fetcher/provider/tests.rs @@ -3,11 +3,16 @@ use assert_matches::assert_matches; use once_cell::sync::Lazy; use test_casing::test_casing; +use zksync_dal::{ConnectionPool, Core}; use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; +use zksync_node_test_utils::create_l2_block; +use zksync_types::{api, L2BlockNumber, ProtocolVersionId}; use zksync_web3_decl::client::MockClient; use super::*; -use crate::tree_data_fetcher::tests::{seal_l1_batch_with_timestamp, MockMainNodeClient}; +use crate::tree_data_fetcher::tests::{ + get_last_l2_block, seal_l1_batch_with_timestamp, MockMainNodeClient, +}; const DIAMOND_PROXY_ADDRESS: Address = Address::repeat_byte(0x22); @@ -18,6 +23,100 @@ static BLOCK_COMMIT_SIGNATURE: Lazy = Lazy::new(|| { .signature() }); +fn mock_block_details_base(number: u32, hash: Option) -> api::BlockDetailsBase { + api::BlockDetailsBase { + timestamp: number.into(), + root_hash: hash, + // The fields below are not read. + l1_tx_count: 0, + l2_tx_count: 1, + status: api::BlockStatus::Sealed, + commit_tx_hash: None, + committed_at: None, + prove_tx_hash: None, + proven_at: None, + execute_tx_hash: None, + executed_at: None, + l1_gas_price: 10, + l2_fair_gas_price: 100, + base_system_contracts_hashes: Default::default(), + } +} + +#[derive(Debug)] +struct L2Parameters { + l2_block_hashes: Vec, + l1_batch_root_hashes: Vec, +} + +impl L2Parameters { + fn mock_client(self) -> MockClient { + let block_number = U64::from(self.l2_block_hashes.len()); + + MockClient::builder(L2::default()) + .method("eth_blockNumber", move || Ok(block_number)) + .method("zks_getL1BatchDetails", move |number: L1BatchNumber| { + let root_hash = self.l1_batch_root_hashes.get(number.0 as usize); + Ok(root_hash.map(|&hash| api::L1BatchDetails { + number, + base: mock_block_details_base(number.0, Some(hash)), + })) + }) + .method("zks_getBlockDetails", move |number: L2BlockNumber| { + let hash = self.l2_block_hashes.get(number.0 as usize); + Ok(hash.map(|&hash| api::BlockDetails { + number, + l1_batch_number: L1BatchNumber(number.0), + operator_address: Address::zero(), + protocol_version: Some(ProtocolVersionId::latest()), + base: mock_block_details_base(number.0, Some(hash)), + })) + }) + .build() + } +} + +#[tokio::test] +async fn rpc_data_provider_basics() { + let last_l2_block = create_l2_block(1); + let l2_parameters = L2Parameters { + l2_block_hashes: vec![H256::zero(), last_l2_block.hash], + l1_batch_root_hashes: vec![H256::zero(), H256::from_low_u64_be(1)], + }; + let mut client: Box> = Box::new(l2_parameters.mock_client()); + + let output = client + .batch_details(L1BatchNumber(1), &last_l2_block) + .await + .unwrap() + .expect("missing block"); + assert_eq!(output.root_hash, H256::from_low_u64_be(1)); + assert_matches!(output.source, TreeDataProviderSource::BatchDetailsRpc); + + // Query a future L1 batch. + let output = client + .batch_details(L1BatchNumber(2), &create_l2_block(2)) + .await + .unwrap(); + assert_matches!(output, Err(MissingData::Batch)); +} + +#[tokio::test] +async fn rpc_data_provider_with_block_hash_divergence() { + let last_l2_block = create_l2_block(1); + let l2_parameters = L2Parameters { + l2_block_hashes: vec![H256::zero(), H256::repeat_byte(1)], // Hash for block #1 differs from the local one + l1_batch_root_hashes: vec![H256::zero(), H256::from_low_u64_be(1)], + }; + let mut client: Box> = Box::new(l2_parameters.mock_client()); + + let output = client + .batch_details(L1BatchNumber(1), &last_l2_block) + .await + .unwrap(); + assert_matches!(output, Err(MissingData::PossibleReorg)); +} + struct EthereumParameters { block_number: U64, // L1 block numbers in which L1 batches are committed starting from L1 batch #1 @@ -43,40 +142,6 @@ impl EthereumParameters { self.l1_blocks_for_commits.push(l1_block_number); } - fn filter_logs(logs: &[web3::Log], filter: web3::Filter) -> Vec { - let Some(web3::BlockNumber::Number(filter_from)) = filter.from_block else { - panic!("Unexpected filter: {filter:?}"); - }; - let Some(web3::BlockNumber::Number(filter_to)) = filter.to_block else { - panic!("Unexpected filter: {filter:?}"); - }; - let filter_block_range = filter_from..=filter_to; - - let filter_addresses = filter.address.unwrap().flatten(); - let filter_topics = filter.topics.unwrap(); - let filter_topics: Vec<_> = filter_topics - .into_iter() - .map(|topic| topic.map(web3::ValueOrArray::flatten)) - .collect(); - - let filtered_logs = logs.iter().filter(|log| { - if !filter_addresses.contains(&log.address) { - return false; - } - if !filter_block_range.contains(&log.block_number.unwrap()) { - return false; - } - filter_topics - .iter() - .zip(&log.topics) - .all(|(filter_topics, actual_topic)| match filter_topics { - Some(topics) => topics.contains(actual_topic), - None => true, - }) - }); - filtered_logs.cloned().collect() - } - fn client(&self) -> MockClient { let logs = self .l1_blocks_for_commits @@ -98,36 +163,72 @@ impl EthereumParameters { } }); let logs: Vec<_> = logs.collect(); - let block_number = self.block_number; + mock_l1_client(self.block_number, logs) + } +} - MockClient::builder(L1::default()) - .method("eth_blockNumber", move || Ok(block_number)) - .method( - "eth_getBlockByNumber", - move |number: web3::BlockNumber, with_txs: bool| { - assert!(!with_txs); - - let number = match number { - web3::BlockNumber::Number(number) => number, - web3::BlockNumber::Latest => block_number, - web3::BlockNumber::Earliest => U64::zero(), - _ => panic!("Unexpected number: {number:?}"), - }; - if number > block_number { - return Ok(None); - } - Ok(Some(web3::Block:: { - number: Some(number), - timestamp: U256::from(number.as_u64()), // timestamp == number - ..web3::Block::default() - })) - }, - ) - .method("eth_getLogs", move |filter: web3::Filter| { - Ok(Self::filter_logs(&logs, filter)) +fn filter_logs(logs: &[web3::Log], filter: web3::Filter) -> Vec { + let Some(web3::BlockNumber::Number(filter_from)) = filter.from_block else { + panic!("Unexpected filter: {filter:?}"); + }; + let Some(web3::BlockNumber::Number(filter_to)) = filter.to_block else { + panic!("Unexpected filter: {filter:?}"); + }; + let filter_block_range = filter_from..=filter_to; + + let filter_addresses = filter.address.unwrap().flatten(); + let filter_topics = filter.topics.unwrap(); + let filter_topics: Vec<_> = filter_topics + .into_iter() + .map(|topic| topic.map(web3::ValueOrArray::flatten)) + .collect(); + + let filtered_logs = logs.iter().filter(|log| { + if !filter_addresses.contains(&log.address) { + return false; + } + if !filter_block_range.contains(&log.block_number.unwrap()) { + return false; + } + filter_topics + .iter() + .zip(&log.topics) + .all(|(filter_topics, actual_topic)| match filter_topics { + Some(topics) => topics.contains(actual_topic), + None => true, }) - .build() - } + }); + filtered_logs.cloned().collect() +} + +fn mock_l1_client(block_number: U64, logs: Vec) -> MockClient { + MockClient::builder(L1::default()) + .method("eth_blockNumber", move || Ok(block_number)) + .method( + "eth_getBlockByNumber", + move |number: web3::BlockNumber, with_txs: bool| { + assert!(!with_txs); + + let number = match number { + web3::BlockNumber::Number(number) => number, + web3::BlockNumber::Latest => block_number, + web3::BlockNumber::Earliest => U64::zero(), + _ => panic!("Unexpected number: {number:?}"), + }; + if number > block_number { + return Ok(None); + } + Ok(Some(web3::Block:: { + number: Some(number), + timestamp: U256::from(number.as_u64()), // timestamp == number + ..web3::Block::default() + })) + }, + ) + .method("eth_getLogs", move |filter: web3::Filter| { + Ok(filter_logs(&logs, filter)) + }) + .build() } #[tokio::test] @@ -163,14 +264,13 @@ async fn test_using_l1_data_provider(l1_batch_timestamps: &[u64]) { seal_l1_batch_with_timestamp(&mut storage, number, ts).await; eth_params.push_commit(ts + 1_000); // have a reasonable small diff between batch generation and commitment } - drop(storage); let mut provider = - L1DataProvider::new(pool, Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS).unwrap(); + L1DataProvider::new(Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS).unwrap(); for i in 0..l1_batch_timestamps.len() { let number = L1BatchNumber(i as u32 + 1); let output = provider - .batch_details(number) + .batch_details(number, &get_last_l2_block(&mut storage, number).await) .await .unwrap() .expect("no root hash"); @@ -198,6 +298,44 @@ async fn using_l1_data_provider(batch_spacing: u64) { test_using_l1_data_provider(&l1_batch_timestamps).await; } +#[tokio::test] +async fn detecting_reorg_in_l1_data_provider() { + let l1_batch_number = H256::from_low_u64_be(1); + // Generate two logs for the same L1 batch #1 + let logs = vec![ + web3::Log { + address: DIAMOND_PROXY_ADDRESS, + topics: vec![ + *BLOCK_COMMIT_SIGNATURE, + l1_batch_number, + H256::repeat_byte(1), + H256::zero(), // commitment hash; not used + ], + block_number: Some(1.into()), + ..web3::Log::default() + }, + web3::Log { + address: DIAMOND_PROXY_ADDRESS, + topics: vec![ + *BLOCK_COMMIT_SIGNATURE, + l1_batch_number, + H256::repeat_byte(2), + H256::zero(), // commitment hash; not used + ], + block_number: Some(100.into()), + ..web3::Log::default() + }, + ]; + let l1_client = mock_l1_client(200.into(), logs); + + let mut provider = L1DataProvider::new(Box::new(l1_client), DIAMOND_PROXY_ADDRESS).unwrap(); + let output = provider + .batch_details(L1BatchNumber(1), &create_l2_block(1)) + .await + .unwrap(); + assert_matches!(output, Err(MissingData::PossibleReorg)); +} + #[tokio::test] async fn combined_data_provider_errors() { let pool = ConnectionPool::::test_pool().await; @@ -210,18 +348,19 @@ async fn combined_data_provider_errors() { seal_l1_batch_with_timestamp(&mut storage, L1BatchNumber(1), 50_000).await; eth_params.push_commit(51_000); seal_l1_batch_with_timestamp(&mut storage, L1BatchNumber(2), 52_000).await; - drop(storage); let mut main_node_client = MockMainNodeClient::default(); main_node_client.insert_batch(L1BatchNumber(2), H256::repeat_byte(2)); - let mut provider = - L1DataProvider::new(pool, Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS) - .unwrap() - .with_fallback(Box::new(main_node_client)); + let mut provider = L1DataProvider::new(Box::new(eth_params.client()), DIAMOND_PROXY_ADDRESS) + .unwrap() + .with_fallback(Box::new(main_node_client)); // L1 batch #1 should be obtained from L1 let output = provider - .batch_details(L1BatchNumber(1)) + .batch_details( + L1BatchNumber(1), + &get_last_l2_block(&mut storage, L1BatchNumber(1)).await, + ) .await .unwrap() .expect("no root hash"); @@ -231,19 +370,14 @@ async fn combined_data_provider_errors() { // L1 batch #2 should be obtained from L2 let output = provider - .batch_details(L1BatchNumber(2)) + .batch_details( + L1BatchNumber(2), + &get_last_l2_block(&mut storage, L1BatchNumber(2)).await, + ) .await .unwrap() .expect("no root hash"); assert_eq!(output.root_hash, H256::repeat_byte(2)); assert_matches!(output.source, TreeDataProviderSource::BatchDetailsRpc); assert!(provider.l1.is_none()); - - // L1 batch #3 is not present anywhere. - let missing = provider - .batch_details(L1BatchNumber(3)) - .await - .unwrap() - .unwrap_err(); - assert_matches!(missing, MissingData::Batch); } diff --git a/core/node/node_sync/src/tree_data_fetcher/tests.rs b/core/node/node_sync/src/tree_data_fetcher/tests.rs index 35671861bb29..3ffbb91d474a 100644 --- a/core/node/node_sync/src/tree_data_fetcher/tests.rs +++ b/core/node/node_sync/src/tree_data_fetcher/tests.rs @@ -36,7 +36,11 @@ impl MockMainNodeClient { #[async_trait] impl TreeDataProvider for MockMainNodeClient { - async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult { + async fn batch_details( + &mut self, + number: L1BatchNumber, + _last_l2_block: &L2BlockHeader, + ) -> TreeDataProviderResult { if self.transient_error.fetch_and(false, Ordering::Relaxed) { let err = ClientError::RequestTimeout; return Err(EnrichedClientError::new(err, "batch_details").into()); @@ -97,6 +101,15 @@ pub(super) async fn seal_l1_batch_with_timestamp( transaction.commit().await.unwrap(); } +pub(super) async fn get_last_l2_block( + storage: &mut Connection<'_, Core>, + number: L1BatchNumber, +) -> L2BlockHeader { + TreeDataFetcher::get_last_l2_block(storage, number) + .await + .unwrap() +} + #[derive(Debug)] struct FetcherHarness { fetcher: TreeDataFetcher, @@ -301,7 +314,11 @@ impl SlowMainNode { #[async_trait] impl TreeDataProvider for SlowMainNode { - async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult { + async fn batch_details( + &mut self, + number: L1BatchNumber, + _last_l2_block: &L2BlockHeader, + ) -> TreeDataProviderResult { if number != L1BatchNumber(1) { return Ok(Err(MissingData::Batch)); } diff --git a/core/node/reorg_detector/src/lib.rs b/core/node/reorg_detector/src/lib.rs index ff9aa63e29b0..5945b201c16c 100644 --- a/core/node/reorg_detector/src/lib.rs +++ b/core/node/reorg_detector/src/lib.rs @@ -41,6 +41,12 @@ pub enum HashMatchError { Internal(#[from] anyhow::Error), } +impl From for HashMatchError { + fn from(err: DalError) -> Self { + Self::Internal(err.generalize()) + } +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error(transparent)] @@ -85,6 +91,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: DalError) -> Self { + Self::HashMatch(HashMatchError::Internal(err.generalize())) + } +} + impl From for Error { fn from(err: EnrichedClientError) -> Self { Self::HashMatch(HashMatchError::Rpc(err)) @@ -255,21 +267,15 @@ impl ReorgDetector { } async fn check_consistency(&mut self) -> Result<(), Error> { - let mut storage = self.pool.connection().await.context("connection()")?; + let mut storage = self.pool.connection().await?; let Some(local_l1_batch) = storage .blocks_dal() .get_last_l1_batch_number_with_tree_data() - .await - .map_err(DalError::generalize)? + .await? else { return Ok(()); }; - let Some(local_l2_block) = storage - .blocks_dal() - .get_sealed_l2_block_number() - .await - .map_err(DalError::generalize)? - else { + let Some(local_l2_block) = storage.blocks_dal().get_sealed_l2_block_number().await? else { return Ok(()); }; drop(storage); @@ -299,12 +305,11 @@ impl ReorgDetector { // Check that the first L1 batch matches, to make sure that // we are actually tracking the same chain as the main node. - let mut storage = self.pool.connection().await.context("connection()")?; + let mut storage = self.pool.connection().await?; let first_l1_batch = storage .blocks_dal() .get_earliest_l1_batch_number_with_metadata() - .await - .map_err(DalError::generalize)? + .await? .context("all L1 batches disappeared")?; drop(storage); match self.root_hashes_match(first_l1_batch).await { @@ -324,12 +329,11 @@ impl ReorgDetector { /// Compares hashes of the given local L2 block and the same L2 block from main node. async fn l2_block_hashes_match(&self, l2_block: L2BlockNumber) -> Result { - let mut storage = self.pool.connection().await.context("connection()")?; + let mut storage = self.pool.connection().await?; let local_hash = storage .blocks_dal() .get_l2_block_header(l2_block) - .await - .map_err(DalError::generalize)? + .await? .with_context(|| format!("Header does not exist for local L2 block #{l2_block}"))? .hash; drop(storage); @@ -353,12 +357,11 @@ impl ReorgDetector { /// Compares root hashes of the latest local batch and of the same batch from the main node. async fn root_hashes_match(&self, l1_batch: L1BatchNumber) -> Result { - let mut storage = self.pool.connection().await.context("connection()")?; + let mut storage = self.pool.connection().await?; let local_hash = storage .blocks_dal() .get_l1_batch_state_root(l1_batch) - .await - .map_err(DalError::generalize)? + .await? .with_context(|| format!("Root hash does not exist for local batch #{l1_batch}"))?; drop(storage); @@ -372,7 +375,34 @@ impl ReorgDetector { Ok(remote_hash == local_hash) } - /// Localizes a re-org: performs binary search to determine the last non-diverged block. + /// Because the node can fetch L1 batch root hash from an external source using the tree data fetcher, there's no strict guarantee + /// that L1 batch root hashes can necessarily be binary searched on their own (i.e., that there exists N such that root hashes of the first N batches match + /// on the main node and this node, and root hashes of L1 batches N + 1, N + 2, ... diverge). The tree data fetcher makes a reasonable attempt + /// to detect a reorg and to not persist root hashes for diverging L1 batches, but we don't trust this logic to work in all cases (yet?). + /// + /// Hence, we perform binary search both by L1 root hashes and the last L2 block hash in the batch; unlike L1 batches, L2 block hashes are *always* fully computed + /// based only on data locally processed by the node. Additionally, an L2 block hash of the last block in a batch encompasses a reasonably large part of L1 batch contents. + async fn root_hashes_and_contents_match( + &self, + l1_batch: L1BatchNumber, + ) -> Result { + let root_hashes_match = self.root_hashes_match(l1_batch).await?; + if !root_hashes_match { + return Ok(false); + } + + let mut storage = self.pool.connection().await?; + let (_, last_l2_block_in_batch) = storage + .blocks_dal() + .get_l2_block_range_of_l1_batch(l1_batch) + .await? + .with_context(|| format!("L1 batch #{l1_batch} does not have L2 blocks"))?; + drop(storage); + + self.l2_block_hashes_match(last_l2_block_in_batch).await + } + + /// Localizes a re-org: performs binary search to determine the last non-diverged L1 batch. async fn detect_reorg( &self, known_valid_l1_batch: L1BatchNumber, @@ -384,7 +414,10 @@ impl ReorgDetector { known_valid_l1_batch.0, diverged_l1_batch.0, |number| async move { - match self.root_hashes_match(L1BatchNumber(number)).await { + match self + .root_hashes_and_contents_match(L1BatchNumber(number)) + .await + { Err(HashMatchError::MissingData(_)) => Ok(true), res => res, } diff --git a/core/node/reorg_detector/src/tests.rs b/core/node/reorg_detector/src/tests.rs index c9c4fd8b224f..c90a3a0592c7 100644 --- a/core/node/reorg_detector/src/tests.rs +++ b/core/node/reorg_detector/src/tests.rs @@ -578,6 +578,51 @@ async fn reorg_is_detected_without_waiting_for_main_node_to_catch_up() { ); } +/// Tests the worst-case scenario w.r.t. L1 batch root hashes: *all* root hashes match locally and on the main node, only L2 block hashes diverge. +#[test_casing(3, [2, 5, 8])] +#[tokio::test] +async fn reorg_is_detected_based_on_l2_block_hashes(last_correct_l1_batch: u32) { + const L1_BATCH_COUNT: u32 = 10; + + assert!(last_correct_l1_batch < L1_BATCH_COUNT); + + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + let genesis_batch = insert_genesis_batch(&mut storage, &GenesisParams::mock()) + .await + .unwrap(); + + let mut client = MockMainNodeClient::default(); + client + .l1_batch_root_hashes + .insert(L1BatchNumber(0), Ok(genesis_batch.root_hash)); + for number in 1..L1_BATCH_COUNT { + let l2_block_hash = H256::from_low_u64_le(number.into()); + store_l2_block(&mut storage, number, l2_block_hash).await; + let remote_l2_block_hash = if number <= last_correct_l1_batch { + l2_block_hash + } else { + H256::zero() + }; + client + .l2_block_hashes + .insert(L2BlockNumber(number), remote_l2_block_hash); + + let l1_batch_root_hash = H256::from_low_u64_be(number.into()); + seal_l1_batch(&mut storage, number, l1_batch_root_hash).await; + client + .l1_batch_root_hashes + .insert(L1BatchNumber(number), Ok(l1_batch_root_hash)); + } + drop(storage); + + let mut detector = create_mock_detector(client, pool); + assert_matches!( + detector.check_consistency().await, + Err(Error::ReorgDetected(L1BatchNumber(num))) if num == last_correct_l1_batch + ); +} + #[derive(Debug)] struct SlowMainNode { l1_batch_root_hash_call_count: Arc,