Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Commit

Permalink
fix(en): Fix reorg detection in presence of tree data fetcher (matter…
Browse files Browse the repository at this point in the history
…-labs#2197)

## What ❔

Fixes reorg detection logic so that it accounts for the tree data
fetcher:

- **In tree data fetcher:** Tries to detect reorgs, so that root hashes
are not written for diverging L1 batches.
- **In reorg detector:** Checks last L2 block correspondence during
binary searching a diverging L1 batch.

## Why ❔

Reorg detection may be broken if tree data fetcher is enabled:

- The tree data fetcher doesn't check that fetched L1 batch root hashes
correspond to local L1 batches, i.e. it can fetch a root hash after a
revert.
- Hence, the logic in reorg detector which binary-searches the diverged
L1 batch is broken because the latest L1 batch isn't guaranteed to
diverge if there's a divergence.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 12, 2024
1 parent dd154f3 commit 20da566
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 142 deletions.
2 changes: 2 additions & 0 deletions core/node/node_sync/src/tree_data_fetcher/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub(super) enum StepOutcomeLabel {
UpdatedBatch,
NoProgress,
RemoteHashMissing,
PossibleReorg,
TransientError,
}

Expand Down Expand Up @@ -91,6 +92,7 @@ impl TreeDataFetcherMetrics {
}
Ok(StepOutcome::NoProgress) => StepOutcomeLabel::NoProgress,
Ok(StepOutcome::RemoteHashMissing) => StepOutcomeLabel::RemoteHashMissing,
Ok(StepOutcome::PossibleReorg) => StepOutcomeLabel::PossibleReorg,
Err(err) if err.is_transient() => StepOutcomeLabel::TransientError,
Err(_) => return, // fatal error; the node will exit soon anyway
};
Expand Down
60 changes: 50 additions & 10 deletions core/node/node_sync/src/tree_data_fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ use serde::Serialize;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::watch;
use zksync_dal::{ConnectionPool, Core, CoreDal, DalError};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal, DalError};
use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck};
use zksync_types::{block::L1BatchTreeData, Address, L1BatchNumber};
use zksync_types::{
block::{L1BatchTreeData, L2BlockHeader},
Address, L1BatchNumber,
};
use zksync_web3_decl::{
client::{DynClient, L1, L2},
error::EnrichedClientError,
Expand Down Expand Up @@ -77,6 +80,7 @@ enum StepOutcome {
UpdatedBatch(L1BatchNumber),
NoProgress,
RemoteHashMissing,
PossibleReorg,
}

/// Component fetching tree data (i.e., state root hashes for L1 batches) from external sources, such as
Expand Down Expand Up @@ -133,7 +137,6 @@ impl TreeDataFetcher {
);

let l1_provider = L1DataProvider::new(
self.pool.clone(),
eth_client.for_component("tree_data_fetcher"),
diamond_proxy_address,
)?;
Expand All @@ -147,7 +150,7 @@ impl TreeDataFetcher {
self.health_updater.subscribe()
}

async fn get_batch_to_fetch(&self) -> anyhow::Result<Option<L1BatchNumber>> {
async fn get_batch_to_fetch(&self) -> anyhow::Result<Option<(L1BatchNumber, L2BlockHeader)>> {
let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?;
// Fetch data in a readonly transaction to have a consistent view of the storage
let mut storage = storage.start_transaction().await?;
Expand All @@ -172,20 +175,41 @@ impl TreeDataFetcher {
earliest_l1_batch
};
Ok(if l1_batch_to_fetch <= last_l1_batch {
Some(l1_batch_to_fetch)
let last_l2_block = Self::get_last_l2_block(&mut storage, l1_batch_to_fetch).await?;
Some((l1_batch_to_fetch, last_l2_block))
} else {
None
})
}

async fn get_last_l2_block(
storage: &mut Connection<'_, Core>,
number: L1BatchNumber,
) -> anyhow::Result<L2BlockHeader> {
let (_, last_l2_block_number) = storage
.blocks_dal()
.get_l2_block_range_of_l1_batch(number)
.await?
.with_context(|| format!("L1 batch #{number} disappeared from Postgres"))?;
storage
.blocks_dal()
.get_l2_block_header(last_l2_block_number)
.await?
.with_context(|| format!("L2 block #{last_l2_block_number} (last for L1 batch #{number}) disappeared from Postgres"))
}

async fn step(&mut self) -> Result<StepOutcome, TreeDataFetcherError> {
let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else {
let Some((l1_batch_to_fetch, last_l2_block_header)) = self.get_batch_to_fetch().await?
else {
return Ok(StepOutcome::NoProgress);
};

tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch} from main node");
tracing::debug!("Fetching tree data for L1 batch #{l1_batch_to_fetch}");
let stage_latency = self.metrics.stage_latency[&ProcessingStage::Fetch].start();
let root_hash_result = self.data_provider.batch_details(l1_batch_to_fetch).await?;
let root_hash_result = self
.data_provider
.batch_details(l1_batch_to_fetch, &last_l2_block_header)
.await?;
stage_latency.observe();
let root_hash = match root_hash_result {
Ok(output) => {
Expand All @@ -199,17 +223,23 @@ impl TreeDataFetcher {
}
Err(MissingData::Batch) => {
let err = anyhow::anyhow!(
"L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present on the main node, \
"L1 batch #{l1_batch_to_fetch} is sealed locally, but is not present externally, \
which is assumed to store batch info indefinitely"
);
return Err(err.into());
}
Err(MissingData::RootHash) => {
tracing::debug!(
"L1 batch #{l1_batch_to_fetch} does not have root hash computed on the main node"
"L1 batch #{l1_batch_to_fetch} does not have root hash computed externally"
);
return Ok(StepOutcome::RemoteHashMissing);
}
Err(MissingData::PossibleReorg) => {
tracing::debug!(
"L1 batch #{l1_batch_to_fetch} potentially diverges from the external source"
);
return Ok(StepOutcome::PossibleReorg);
}
};

let stage_latency = self.metrics.stage_latency[&ProcessingStage::Persistence].start();
Expand Down Expand Up @@ -266,6 +296,16 @@ impl TreeDataFetcher {
self.update_health(last_updated_l1_batch);
true
}
Ok(StepOutcome::PossibleReorg) => {
tracing::info!("Potential chain reorg detected by tree data fetcher; not updating tree data");
// Since we don't trust the reorg logic in the tree data fetcher, we let it continue working
// so that, if there's a false positive, the whole node doesn't crash (or is in a crash loop in the worst-case scenario).
let health = TreeDataFetcherHealth::Affected {
error: "Potential chain reorg".to_string(),
};
self.health_updater.update(health.into());
true
}
Err(err) if err.is_transient() => {
tracing::warn!(
"Transient error in tree data fetcher, will retry after a delay: {err:?}"
Expand Down
79 changes: 50 additions & 29 deletions core/node/node_sync/src/tree_data_fetcher/provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::fmt;
use anyhow::Context;
use async_trait::async_trait;
use vise::{EncodeLabelSet, EncodeLabelValue};
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_eth_client::EthInterface;
use zksync_types::{web3, Address, L1BatchNumber, H256, U256, U64};
use zksync_types::{block::L2BlockHeader, web3, Address, L1BatchNumber, H256, U256, U64};
use zksync_web3_decl::{
client::{DynClient, L1, L2},
error::{ClientRpcContext, EnrichedClientError, EnrichedClientResult},
Expand All @@ -26,6 +25,8 @@ pub(super) enum MissingData {
/// The provider lacks a root hash for a requested L1 batch; the batch itself is present on the provider.
#[error("no root hash for L1 batch")]
RootHash,
#[error("possible chain reorg detected")]
PossibleReorg,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
Expand All @@ -48,14 +49,23 @@ pub(super) type TreeDataProviderResult =
#[async_trait]
pub(super) trait TreeDataProvider: fmt::Debug + Send + Sync + 'static {
/// Fetches a state root hash for the L1 batch with the specified number.
/// The method receives a header of the last L2 block in the batch, which can be used to check L1 batch consistency etc.
///
/// It is guaranteed that this method will be called with monotonically increasing `number`s (although not necessarily sequential ones).
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult;
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult;
}

#[async_trait]
impl TreeDataProvider for Box<DynClient<L2>> {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
let Some(batch_details) = self
.get_l1_batch_details(number)
.rpc_context("get_l1_batch_details")
Expand All @@ -64,6 +74,24 @@ impl TreeDataProvider for Box<DynClient<L2>> {
else {
return Ok(Err(MissingData::Batch));
};

// Check the local data correspondence.
let remote_l2_block_hash = self
.get_block_details(last_l2_block.number)
.rpc_context("get_block_details")
.with_arg("number", &last_l2_block.number)
.await?
.and_then(|block| block.base.root_hash);
if remote_l2_block_hash != Some(last_l2_block.hash) {
let last_l2_block_number = last_l2_block.number;
let last_l2_block_hash = last_l2_block.hash;
tracing::info!(
"Fetched hash of the last L2 block #{last_l2_block_number} in L1 batch #{number} ({remote_l2_block_hash:?}) \
does not match the local one ({last_l2_block_hash:?}); this can be caused by a chain reorg"
);
return Ok(Err(MissingData::PossibleReorg));
}

Ok(batch_details
.base
.root_hash
Expand Down Expand Up @@ -94,7 +122,6 @@ struct PastL1BatchInfo {
/// (provided it's not too far behind the seal timestamp of the batch).
#[derive(Debug)]
pub(super) struct L1DataProvider {
pool: ConnectionPool<Core>,
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
block_commit_signature: H256,
Expand All @@ -109,7 +136,6 @@ impl L1DataProvider {
const L1_BLOCK_RANGE: U64 = U64([20_000]);

pub fn new(
pool: ConnectionPool<Core>,
eth_client: Box<DynClient<L1>>,
diamond_proxy_address: Address,
) -> anyhow::Result<Self> {
Expand All @@ -118,29 +144,13 @@ impl L1DataProvider {
.context("missing `BlockCommit` event")?
.signature();
Ok(Self {
pool,
eth_client,
diamond_proxy_address,
block_commit_signature,
past_l1_batch: None,
})
}

async fn l1_batch_seal_timestamp(&self, number: L1BatchNumber) -> anyhow::Result<u64> {
let mut storage = self.pool.connection_tagged("tree_data_fetcher").await?;
let (_, last_l2_block_number) = storage
.blocks_dal()
.get_l2_block_range_of_l1_batch(number)
.await?
.with_context(|| format!("L1 batch #{number} does not have L2 blocks"))?;
let block_header = storage
.blocks_dal()
.get_l2_block_header(last_l2_block_number)
.await?
.with_context(|| format!("L2 block #{last_l2_block_number} (last block in L1 batch #{number}) disappeared"))?;
Ok(block_header.timestamp)
}

/// Guesses the number of an L1 block with a `BlockCommit` event for the specified L1 batch.
/// The guess is based on the L1 batch seal timestamp.
async fn guess_l1_commit_block_number(
Expand Down Expand Up @@ -206,8 +216,12 @@ impl L1DataProvider {

#[async_trait]
impl TreeDataProvider for L1DataProvider {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
let l1_batch_seal_timestamp = self.l1_batch_seal_timestamp(number).await?;
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
let l1_batch_seal_timestamp = last_l2_block.timestamp;
let from_block = self.past_l1_batch.and_then(|info| {
assert!(
info.number < number,
Expand Down Expand Up @@ -297,8 +311,11 @@ impl TreeDataProvider for L1DataProvider {
}))
}
_ => {
tracing::warn!("Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}: {logs:?}");
Ok(Err(MissingData::RootHash))
tracing::warn!(
"Non-unique `BlockCommit` event for L1 batch #{number} queried using {filter:?}, potentially as a result \
of a chain reorg: {logs:?}"
);
Ok(Err(MissingData::PossibleReorg))
}
}
}
Expand All @@ -313,9 +330,13 @@ pub(super) struct CombinedDataProvider {

#[async_trait]
impl TreeDataProvider for CombinedDataProvider {
async fn batch_details(&mut self, number: L1BatchNumber) -> TreeDataProviderResult {
async fn batch_details(
&mut self,
number: L1BatchNumber,
last_l2_block: &L2BlockHeader,
) -> TreeDataProviderResult {
if let Some(l1) = &mut self.l1 {
match l1.batch_details(number).await {
match l1.batch_details(number, last_l2_block).await {
Err(err) => {
if err.is_transient() {
tracing::info!(
Expand All @@ -342,6 +363,6 @@ impl TreeDataProvider for CombinedDataProvider {
}
}
}
self.fallback.batch_details(number).await
self.fallback.batch_details(number, last_l2_block).await
}
}
Loading

0 comments on commit 20da566

Please sign in to comment.