From 4717b8caa875b80271807ffbb4c4b0fa3f51c0f9 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 10 Jun 2024 17:01:37 +0300 Subject: [PATCH 1/5] Sketch divergence check in tree --- core/lib/merkle_tree/src/domain.rs | 6 ++ core/node/metadata_calculator/src/helpers.rs | 18 +++- core/node/metadata_calculator/src/updater.rs | 99 ++++++++++++++++++-- 3 files changed, 113 insertions(+), 10 deletions(-) diff --git a/core/lib/merkle_tree/src/domain.rs b/core/lib/merkle_tree/src/domain.rs index 5e3bc77ab935..ffc4b0b84106 100644 --- a/core/lib/merkle_tree/src/domain.rs +++ b/core/lib/merkle_tree/src/domain.rs @@ -166,6 +166,12 @@ impl ZkSyncTree { self.tree.latest_root_hash() } + /// Returns the root hash and leaf count at the specified L1 batch. + pub fn root_info(&self, l1_batch_number: L1BatchNumber) -> Option<(ValueHash, u64)> { + let root = self.tree.root(l1_batch_number.0.into())?; + Some((root.hash(&Blake2Hasher), root.leaf_count())) + } + /// Checks whether this tree is empty. pub fn is_empty(&self) -> bool { let Some(version) = self.tree.latest_version() else { diff --git a/core/node/metadata_calculator/src/helpers.rs b/core/node/metadata_calculator/src/helpers.rs index 20fd0babaac8..f16e618a805d 100644 --- a/core/node/metadata_calculator/src/helpers.rs +++ b/core/node/metadata_calculator/src/helpers.rs @@ -27,7 +27,9 @@ use zksync_merkle_tree::{ }; use zksync_storage::{RocksDB, RocksDBOptions, StalledWritesRetries, WeakRocksDB}; use zksync_types::{ - block::L1BatchHeader, writes::TreeWrite, AccountTreeId, L1BatchNumber, StorageKey, H256, + block::{L1BatchHeader, L1BatchTreeData}, + writes::TreeWrite, + AccountTreeId, L1BatchNumber, StorageKey, H256, }; use super::{ @@ -233,11 +235,23 @@ impl AsyncTree { self.as_ref().next_l1_batch_number() } + pub fn min_l1_batch_number(&self) -> Option { + self.as_ref().reader().min_l1_batch_number() + } + #[cfg(test)] pub fn root_hash(&self) -> H256 { self.as_ref().root_hash() } + pub fn data_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> Option { + let (hash, rollup_last_leaf_index) = self.as_ref().root_info(l1_batch_number)?; + Some(L1BatchTreeData { + hash, + rollup_last_leaf_index, + }) + } + /// Returned errors are unrecoverable; the tree must not be used after an error is returned. pub async fn process_l1_batch( &mut self, @@ -279,7 +293,7 @@ impl AsyncTree { Ok(()) } - pub fn revert_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { + pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> { self.as_mut().roll_back_logs(last_l1_batch_to_keep) } } diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index cca6fce6d4cd..e57eaf51630c 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -198,6 +198,92 @@ impl TreeUpdater { Ok(()) } + /// Invariant: the tree is not ahead of Postgres. + async fn ensure_no_l1_batch_divergence( + pool: &ConnectionPool, + tree: &mut AsyncTree, + ) -> anyhow::Result<()> { + let Some(last_tree_l1_batch) = tree.next_l1_batch_number().checked_sub(1) else { + // No L1 batches in the tree means no divergence. + return Ok(()); + }; + let last_tree_l1_batch = L1BatchNumber(last_tree_l1_batch); + + let mut storage = pool.connection_tagged("metadata_calculator").await?; + if Self::l1_batch_matches(&mut storage, tree, last_tree_l1_batch).await? { + tracing::debug!( + "Last l1 batch in tree #{last_tree_l1_batch} has same data in tree and Postgres" + ); + return Ok(()); + } + + tracing::debug!("Last l1 batch in tree #{last_tree_l1_batch} has diverging data in tree and Postgres; searching for the last common L1 batch"); + let min_tree_l1_batch = tree + .min_l1_batch_number() + .context("tree shouldn't be empty at this point")?; + anyhow::ensure!( + min_tree_l1_batch <= last_tree_l1_batch, + "potential Merkle tree corruption: minimum L1 batch number ({min_tree_l1_batch}) exceeds the last L1 batch ({last_tree_l1_batch})" + ); + + anyhow::ensure!( + Self::l1_batch_matches(&mut storage, tree, min_tree_l1_batch).await?, + "Diverging min L1 batch in the tree #{min_tree_l1_batch}; the tree cannot recover from this" + ); + + let mut left = min_tree_l1_batch.0; + let mut right = last_tree_l1_batch.0; + while left + 1 < right { + let middle = (left + right) / 2; + let batch_matches = + Self::l1_batch_matches(&mut storage, tree, L1BatchNumber(middle)).await?; + if batch_matches { + left = middle; + } else { + right = middle; + } + } + let last_common_l1_batch_number = L1BatchNumber(left); + tracing::info!("Found last common L1 batch between tree and Postgres: #{last_common_l1_batch_number}; will revert tree to it"); + + tree.roll_back_logs(last_common_l1_batch_number)?; + tree.save().await?; + Ok(()) + } + + async fn l1_batch_matches( + storage: &mut Connection<'_, Core>, + tree: &AsyncTree, + l1_batch: L1BatchNumber, + ) -> anyhow::Result { + if l1_batch == L1BatchNumber(0) { + // Corner case: root hash for L1 batch #0 persisted in Postgres is fictive (set to `H256::zero()`). + return Ok(true); + } + + let Some(tree_data) = tree.data_for_l1_batch(l1_batch) else { + // Corner case: the L1 batch was pruned in the tree. + return Ok(true); + }; + let Some(tree_data_from_postgres) = storage + .blocks_dal() + .get_l1_batch_tree_data(l1_batch) + .await? + else { + // Corner case: the L1 batch was pruned in Postgres (including initial snapshot recovery). + return Ok(true); + }; + + let data_matches = tree_data == tree_data_from_postgres; + if !data_matches { + tracing::warn!( + "Detected diverging tree data for L1 batch #{l1_batch}; data in tree is: {tree_data:?}, \ + data in Postgres is: {tree_data_from_postgres:?}" + ); + } + Ok(data_matches) + } + /// The processing loop for this updater. pub async fn loop_updating_tree( mut self, @@ -253,22 +339,19 @@ impl TreeUpdater { METRICS.backup_lag.set(backup_lag.into()); if next_l1_batch_to_seal > last_l1_batch_with_tree_data + 1 { - // Check stop signal before proceeding with a potentially time-consuming operation. - if *stop_receiver.borrow_and_update() { - tracing::info!("Stop signal received, metadata_calculator is shutting down"); - return Ok(()); - } - tracing::warn!( "Next L1 batch of the tree ({next_l1_batch_to_seal}) is greater than last L1 batch with metadata in Postgres \ ({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \ Truncating Merkle tree versions so that this mismatch is fixed..." ); - tree.revert_logs(last_l1_batch_with_tree_data)?; + tree.roll_back_logs(last_l1_batch_with_tree_data)?; tree.save().await?; - next_l1_batch_to_seal = tree.next_l1_batch_number(); tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); } + + // FIXME: move to before the tree is fully initialized + Self::ensure_no_l1_batch_divergence(pool, tree).await?; + next_l1_batch_to_seal = tree.next_l1_batch_number(); } loop { From c1d7ea815a05d270a76c404db745cbe91807d4af Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 10 Jun 2024 17:38:21 +0300 Subject: [PATCH 2/5] Refactor tree consistency checks --- core/node/metadata_calculator/src/lib.rs | 14 +- core/node/metadata_calculator/src/updater.rs | 180 +++++++++++-------- 2 files changed, 114 insertions(+), 80 deletions(-) diff --git a/core/node/metadata_calculator/src/lib.rs b/core/node/metadata_calculator/src/lib.rs index 4a422f243f40..b57f0dfacb70 100644 --- a/core/node/metadata_calculator/src/lib.rs +++ b/core/node/metadata_calculator/src/lib.rs @@ -217,7 +217,7 @@ impl MetadataCalculator { GenericAsyncTree::new(db, &self.config).await } - pub async fn run(self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + pub async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> { let tree = self.create_tree().await?; let tree = tree .ensure_ready( @@ -231,13 +231,19 @@ impl MetadataCalculator { let Some(mut tree) = tree else { return Ok(()); // recovery was aborted because a stop signal was received }; - + // Set a tree reader before the tree is fully initialized to not wait for the first L1 batch to appear in Postgres. let tree_reader = tree.reader(); - let tree_info = tree_reader.clone().info().await; + self.tree_reader.send_replace(Some(tree_reader)); + + tree.ensure_consistency(&self.delayer, &self.pool, &mut stop_receiver) + .await?; if !self.pruning_handles_sender.is_closed() { + // Unlike tree reader, we shouldn't initialize pruning (as a task modifying the tree) before the tree is guaranteed + // to be consistent with Postgres. self.pruning_handles_sender.send(tree.pruner()).ok(); } - self.tree_reader.send_replace(Some(tree_reader)); + + let tree_info = tree.reader().info().await; tracing::info!("Merkle tree is initialized and ready to process L1 batches: {tree_info:?}"); self.health_updater .update(MerkleTreeHealth::MainLoop(tree_info).into()); diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index e57eaf51630c..26e826d08d33 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -198,19 +198,98 @@ impl TreeUpdater { Ok(()) } + /// The processing loop for this updater. + pub async fn loop_updating_tree( + mut self, + delayer: Delayer, + pool: &ConnectionPool, + mut stop_receiver: watch::Receiver, + ) -> anyhow::Result<()> { + let tree = &mut self.tree; + let mut next_l1_batch_to_seal = tree.next_l1_batch_number(); + tracing::info!( + "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ + Next L1 batch for Merkle tree: {next_l1_batch_to_seal}", + max_batches_per_iter = self.max_l1_batches_per_iter + ); + + loop { + if *stop_receiver.borrow_and_update() { + tracing::info!("Stop signal received, metadata_calculator is shutting down"); + break; + } + let storage = pool.connection_tagged("metadata_calculator").await?; + + let snapshot = *next_l1_batch_to_seal; + self.step(storage, &mut next_l1_batch_to_seal).await?; + let delay = if snapshot == *next_l1_batch_to_seal { + tracing::trace!( + "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) \ + didn't make any progress; delaying it using {delayer:?}" + ); + delayer.wait(&self.tree).left_future() + } else { + tracing::trace!( + "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) made progress from #{snapshot}" + ); + future::ready(()).right_future() + }; + + // The delays we're operating with are reasonably small, but selecting between the delay + // and the stop receiver still allows to be more responsive during shutdown. + tokio::select! { + _ = stop_receiver.changed() => { + tracing::info!("Stop signal received, metadata_calculator is shutting down"); + break; + } + () = delay => { /* The delay has passed */ } + } + } + Ok(()) + } +} + +impl AsyncTree { + async fn ensure_genesis( + &mut self, + storage: &mut Connection<'_, Core>, + earliest_l1_batch: L1BatchNumber, + ) -> anyhow::Result<()> { + if !self.is_empty() { + return Ok(()); + } + + anyhow::ensure!( + earliest_l1_batch == L1BatchNumber(0), + "Non-zero earliest L1 batch #{earliest_l1_batch} is not supported without previous tree recovery" + ); + let batch = L1BatchWithLogs::new(storage, earliest_l1_batch, self.mode()) + .await + .with_context(|| { + format!("failed fetching tree input for L1 batch #{earliest_l1_batch}") + })? + .context("Missing storage logs for the genesis L1 batch")?; + self.process_l1_batch(batch).await?; + self.save().await?; + Ok(()) + } + /// Invariant: the tree is not ahead of Postgres. async fn ensure_no_l1_batch_divergence( + &mut self, pool: &ConnectionPool, - tree: &mut AsyncTree, ) -> anyhow::Result<()> { - let Some(last_tree_l1_batch) = tree.next_l1_batch_number().checked_sub(1) else { + let Some(last_tree_l1_batch) = self.next_l1_batch_number().checked_sub(1) else { // No L1 batches in the tree means no divergence. return Ok(()); }; let last_tree_l1_batch = L1BatchNumber(last_tree_l1_batch); let mut storage = pool.connection_tagged("metadata_calculator").await?; - if Self::l1_batch_matches(&mut storage, tree, last_tree_l1_batch).await? { + if self + .l1_batch_matches(&mut storage, last_tree_l1_batch) + .await? + { tracing::debug!( "Last l1 batch in tree #{last_tree_l1_batch} has same data in tree and Postgres" ); @@ -218,7 +297,7 @@ impl TreeUpdater { } tracing::debug!("Last l1 batch in tree #{last_tree_l1_batch} has diverging data in tree and Postgres; searching for the last common L1 batch"); - let min_tree_l1_batch = tree + let min_tree_l1_batch = self .min_l1_batch_number() .context("tree shouldn't be empty at this point")?; anyhow::ensure!( @@ -227,7 +306,7 @@ impl TreeUpdater { ); anyhow::ensure!( - Self::l1_batch_matches(&mut storage, tree, min_tree_l1_batch).await?, + self.l1_batch_matches(&mut storage, min_tree_l1_batch).await?, "Diverging min L1 batch in the tree #{min_tree_l1_batch}; the tree cannot recover from this" ); @@ -235,8 +314,9 @@ impl TreeUpdater { let mut right = last_tree_l1_batch.0; while left + 1 < right { let middle = (left + right) / 2; - let batch_matches = - Self::l1_batch_matches(&mut storage, tree, L1BatchNumber(middle)).await?; + let batch_matches = self + .l1_batch_matches(&mut storage, L1BatchNumber(middle)) + .await?; if batch_matches { left = middle; } else { @@ -246,14 +326,14 @@ impl TreeUpdater { let last_common_l1_batch_number = L1BatchNumber(left); tracing::info!("Found last common L1 batch between tree and Postgres: #{last_common_l1_batch_number}; will revert tree to it"); - tree.roll_back_logs(last_common_l1_batch_number)?; - tree.save().await?; + self.roll_back_logs(last_common_l1_batch_number)?; + self.save().await?; Ok(()) } async fn l1_batch_matches( + &self, storage: &mut Connection<'_, Core>, - tree: &AsyncTree, l1_batch: L1BatchNumber, ) -> anyhow::Result { if l1_batch == L1BatchNumber(0) { @@ -261,7 +341,7 @@ impl TreeUpdater { return Ok(true); } - let Some(tree_data) = tree.data_for_l1_batch(l1_batch) else { + let Some(tree_data) = self.data_for_l1_batch(l1_batch) else { // Corner case: the L1 batch was pruned in the tree. return Ok(true); }; @@ -284,37 +364,22 @@ impl TreeUpdater { Ok(data_matches) } - /// The processing loop for this updater. - pub async fn loop_updating_tree( - mut self, - delayer: Delayer, + /// Ensures that the tree is consistent with Postgres, truncating the tree if necessary. + pub(crate) async fn ensure_consistency( + &mut self, + delayer: &Delayer, pool: &ConnectionPool, - mut stop_receiver: watch::Receiver, + stop_receiver: &mut watch::Receiver, ) -> anyhow::Result<()> { let Some(earliest_l1_batch) = - wait_for_l1_batch(pool, delayer.delay_interval(), &mut stop_receiver).await? + wait_for_l1_batch(pool, delayer.delay_interval(), stop_receiver).await? else { return Ok(()); // Stop signal received }; let mut storage = pool.connection_tagged("metadata_calculator").await?; - // Ensure genesis creation - let tree = &mut self.tree; - if tree.is_empty() { - anyhow::ensure!( - earliest_l1_batch == L1BatchNumber(0), - "Non-zero earliest L1 batch #{earliest_l1_batch} is not supported without previous tree recovery" - ); - let batch = L1BatchWithLogs::new(&mut storage, earliest_l1_batch, tree.mode()) - .await - .with_context(|| { - format!("failed fetching tree input for L1 batch #{earliest_l1_batch}") - })? - .context("Missing storage logs for the genesis L1 batch")?; - tree.process_l1_batch(batch).await?; - tree.save().await?; - } - let mut next_l1_batch_to_seal = tree.next_l1_batch_number(); + self.ensure_genesis(&mut storage, earliest_l1_batch).await?; + let next_l1_batch_to_seal = self.next_l1_batch_number(); let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; let last_l1_batch_with_tree_data = storage @@ -324,10 +389,8 @@ impl TreeUpdater { drop(storage); tracing::info!( - "Initialized metadata calculator with {max_batches_per_iter} max L1 batches per iteration. \ - Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ - last L1 batch with metadata: {last_l1_batch_with_tree_data:?}", - max_batches_per_iter = self.max_l1_batches_per_iter + "Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ + last L1 batch with metadata: {last_l1_batch_with_tree_data:?}" ); // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after @@ -344,47 +407,12 @@ impl TreeUpdater { ({last_l1_batch_with_tree_data}); this may be a result of restoring Postgres from a snapshot. \ Truncating Merkle tree versions so that this mismatch is fixed..." ); - tree.roll_back_logs(last_l1_batch_with_tree_data)?; - tree.save().await?; + self.roll_back_logs(last_l1_batch_with_tree_data)?; + self.save().await?; tracing::info!("Truncated Merkle tree to L1 batch #{next_l1_batch_to_seal}"); } - // FIXME: move to before the tree is fully initialized - Self::ensure_no_l1_batch_divergence(pool, tree).await?; - next_l1_batch_to_seal = tree.next_l1_batch_number(); - } - - loop { - if *stop_receiver.borrow_and_update() { - tracing::info!("Stop signal received, metadata_calculator is shutting down"); - break; - } - let storage = pool.connection_tagged("metadata_calculator").await?; - - let snapshot = *next_l1_batch_to_seal; - self.step(storage, &mut next_l1_batch_to_seal).await?; - let delay = if snapshot == *next_l1_batch_to_seal { - tracing::trace!( - "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) \ - didn't make any progress; delaying it using {delayer:?}" - ); - delayer.wait(&self.tree).left_future() - } else { - tracing::trace!( - "Metadata calculator (next L1 batch: #{next_l1_batch_to_seal}) made progress from #{snapshot}" - ); - future::ready(()).right_future() - }; - - // The delays we're operating with are reasonably small, but selecting between the delay - // and the stop receiver still allows to be more responsive during shutdown. - tokio::select! { - _ = stop_receiver.changed() => { - tracing::info!("Stop signal received, metadata_calculator is shutting down"); - break; - } - () = delay => { /* The delay has passed */ } - } + self.ensure_no_l1_batch_divergence(pool).await?; } Ok(()) } From 1d02b27750f5380a2910f8ee41e05aab1b23d069 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 10 Jun 2024 21:55:35 +0300 Subject: [PATCH 3/5] Test tree consistency checks --- core/node/metadata_calculator/src/helpers.rs | 4 +- core/node/metadata_calculator/src/tests.rs | 108 ++++++++++++++++++- core/node/metadata_calculator/src/updater.rs | 6 +- 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/core/node/metadata_calculator/src/helpers.rs b/core/node/metadata_calculator/src/helpers.rs index f16e618a805d..5ac9e329c62c 100644 --- a/core/node/metadata_calculator/src/helpers.rs +++ b/core/node/metadata_calculator/src/helpers.rs @@ -245,10 +245,10 @@ impl AsyncTree { } pub fn data_for_l1_batch(&self, l1_batch_number: L1BatchNumber) -> Option { - let (hash, rollup_last_leaf_index) = self.as_ref().root_info(l1_batch_number)?; + let (hash, leaf_count) = self.as_ref().root_info(l1_batch_number)?; Some(L1BatchTreeData { hash, - rollup_last_leaf_index, + rollup_last_leaf_index: leaf_count + 1, }) } diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index 0406544614d4..2b48652edde4 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -5,6 +5,7 @@ use std::{future::Future, ops, panic, path::Path, sync::Arc, time::Duration}; use assert_matches::assert_matches; use itertools::Itertools; use tempfile::TempDir; +use test_casing::{test_casing, Product}; use tokio::sync::{mpsc, watch}; use zksync_config::configs::{ chain::OperationsManagerConfig, @@ -19,8 +20,8 @@ use zksync_object_store::{MockObjectStore, ObjectStore}; use zksync_prover_interface::inputs::PrepareBasicCircuitsJob; use zksync_storage::RocksDB; use zksync_types::{ - block::L1BatchHeader, AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, - StorageLog, H256, + block::{L1BatchHeader, L1BatchTreeData}, + AccountTreeId, Address, L1BatchNumber, L2BlockNumber, StorageKey, StorageLog, H256, }; use zksync_utils::u32_to_h256; @@ -28,6 +29,7 @@ use super::{ helpers::L1BatchWithLogs, GenericAsyncTree, MetadataCalculator, MetadataCalculatorConfig, MetadataCalculatorRecoveryConfig, }; +use crate::helpers::{AsyncTree, Delayer}; const RUN_TIMEOUT: Duration = Duration::from_secs(30); @@ -74,6 +76,91 @@ async fn genesis_creation() { assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); } +#[tokio::test] +async fn low_level_genesis_creation() { + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + insert_genesis_batch( + &mut pool.connection().await.unwrap(), + &GenesisParams::mock(), + ) + .await + .unwrap(); + reset_db_state(&pool, 1).await; + + let db = RocksDB::new(temp_dir.path()).unwrap(); + let mut tree = AsyncTree::new(db.into(), MerkleTreeMode::Lightweight).unwrap(); + let (_stop_sender, mut stop_receiver) = watch::channel(false); + tree.ensure_consistency( + &Delayer::new(Duration::from_millis(10)), + &pool, + &mut stop_receiver, + ) + .await + .unwrap(); + + assert!(!tree.is_empty()); + assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); +} + +#[test_casing(8, Product(([1, 4, 7, 9], [false, true])))] +#[tokio::test] +async fn tree_truncation_on_l1_batch_divergence( + last_common_l1_batch: u32, + overwrite_tree_data: bool, +) { + const INITIAL_BATCH_COUNT: usize = 10; + + assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT); + let last_common_l1_batch = L1BatchNumber(last_common_l1_batch); + + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + reset_db_state(&pool, INITIAL_BATCH_COUNT).await; + run_calculator(calculator).await; + + let mut storage = pool.connection().await.unwrap(); + remove_l1_batches(&mut storage, last_common_l1_batch).await; + // Extend the state with new L1 batches. + let logs = gen_storage_logs(100..200, 5); + extend_db_state(&mut storage, logs).await; + + if overwrite_tree_data { + for number in (last_common_l1_batch.0 + 1)..(last_common_l1_batch.0 + 6) { + let new_tree_data = L1BatchTreeData { + hash: H256::from_low_u64_be(number.into()), + rollup_last_leaf_index: 200, // doesn't matter + }; + storage + .blocks_dal() + .save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data) + .await + .unwrap(); + } + } + + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let tree = calculator.create_tree().await.unwrap(); + let GenericAsyncTree::Ready(mut tree) = tree else { + panic!("Unexpected tree state: {tree:?}"); + }; + assert_eq!( + tree.next_l1_batch_number(), + L1BatchNumber(INITIAL_BATCH_COUNT as u32 + 1) + ); + + let (_stop_sender, mut stop_receiver) = watch::channel(false); + tree.ensure_consistency( + &Delayer::new(Duration::from_millis(10)), + &pool, + &mut stop_receiver, + ) + .await + .unwrap(); + assert_eq!(tree.next_l1_batch_number(), last_common_l1_batch + 1); +} + #[tokio::test] async fn basic_workflow() { let pool = ConnectionPool::::test_pool().await; @@ -640,6 +727,23 @@ async fn remove_l1_batches( batch_headers.push(header.unwrap()); } + let (_, last_l2_block_to_keep) = storage + .blocks_dal() + .get_l2_block_range_of_l1_batch(last_l1_batch_to_keep) + .await + .unwrap() + .expect("L1 batch has no blocks"); + + storage + .storage_logs_dal() + .roll_back_storage_logs(last_l2_block_to_keep) + .await + .unwrap(); + storage + .blocks_dal() + .delete_l2_blocks(last_l2_block_to_keep) + .await + .unwrap(); storage .blocks_dal() .delete_l1_batches(last_l1_batch_to_keep) diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 26e826d08d33..6bbc5c7dd41a 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -336,6 +336,7 @@ impl AsyncTree { storage: &mut Connection<'_, Core>, l1_batch: L1BatchNumber, ) -> anyhow::Result { + dbg!(l1_batch); if l1_batch == L1BatchNumber(0) { // Corner case: root hash for L1 batch #0 persisted in Postgres is fictive (set to `H256::zero()`). return Ok(true); @@ -354,7 +355,8 @@ impl AsyncTree { return Ok(true); }; - let data_matches = tree_data == tree_data_from_postgres; + let data_matches = dbg!(tree_data) == dbg!(tree_data_from_postgres); + dbg!(data_matches); if !data_matches { tracing::warn!( "Detected diverging tree data for L1 batch #{l1_batch}; data in tree is: {tree_data:?}, \ @@ -380,6 +382,7 @@ impl AsyncTree { self.ensure_genesis(&mut storage, earliest_l1_batch).await?; let next_l1_batch_to_seal = self.next_l1_batch_number(); + dbg!(next_l1_batch_to_seal); let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; let last_l1_batch_with_tree_data = storage @@ -392,6 +395,7 @@ impl AsyncTree { "Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ last L1 batch with metadata: {last_l1_batch_with_tree_data:?}" ); + dbg!(last_l1_batch_with_tree_data); // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component From b12727979540f476190f1cafd46d1140caa6950b Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 11 Jun 2024 12:12:42 +0300 Subject: [PATCH 4/5] Remove debug artifacts --- core/lib/snapshots_applier/src/tests/utils.rs | 4 ++-- core/node/metadata_calculator/src/updater.rs | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/lib/snapshots_applier/src/tests/utils.rs b/core/lib/snapshots_applier/src/tests/utils.rs index d3d1c3ae6e03..b48277a88e52 100644 --- a/core/lib/snapshots_applier/src/tests/utils.rs +++ b/core/lib/snapshots_applier/src/tests/utils.rs @@ -332,12 +332,12 @@ impl ObjectStore for HangingObjectStore { let mut should_proceed = true; self.count_sender.send_modify(|count| { *count += 1; - if dbg!(*count) > self.stop_after_count { + if *count > self.stop_after_count { should_proceed = false; } }); - if dbg!(should_proceed) { + if should_proceed { self.inner.get_raw(bucket, key).await } else { future::pending().await // Hang up the snapshot applier task diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 6bbc5c7dd41a..2850b1d06126 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -336,7 +336,6 @@ impl AsyncTree { storage: &mut Connection<'_, Core>, l1_batch: L1BatchNumber, ) -> anyhow::Result { - dbg!(l1_batch); if l1_batch == L1BatchNumber(0) { // Corner case: root hash for L1 batch #0 persisted in Postgres is fictive (set to `H256::zero()`). return Ok(true); @@ -355,18 +354,18 @@ impl AsyncTree { return Ok(true); }; - let data_matches = dbg!(tree_data) == dbg!(tree_data_from_postgres); - dbg!(data_matches); + let data_matches = tree_data == tree_data_from_postgres; if !data_matches { tracing::warn!( "Detected diverging tree data for L1 batch #{l1_batch}; data in tree is: {tree_data:?}, \ - data in Postgres is: {tree_data_from_postgres:?}" + data in Postgres is: {tree_data_from_postgres:?}" ); } Ok(data_matches) } /// Ensures that the tree is consistent with Postgres, truncating the tree if necessary. + /// This will wait for at least one L1 batch to appear in Postgres if necessary. pub(crate) async fn ensure_consistency( &mut self, delayer: &Delayer, @@ -382,7 +381,6 @@ impl AsyncTree { self.ensure_genesis(&mut storage, earliest_l1_batch).await?; let next_l1_batch_to_seal = self.next_l1_batch_number(); - dbg!(next_l1_batch_to_seal); let current_db_batch = storage.blocks_dal().get_sealed_l1_batch_number().await?; let last_l1_batch_with_tree_data = storage @@ -395,7 +393,6 @@ impl AsyncTree { "Next L1 batch for Merkle tree: {next_l1_batch_to_seal}, current Postgres L1 batch: {current_db_batch:?}, \ last L1 batch with metadata: {last_l1_batch_with_tree_data:?}" ); - dbg!(last_l1_batch_with_tree_data); // It may be the case that we don't have any L1 batches with metadata in Postgres, e.g. after // recovering from a snapshot. We cannot wait for such a batch to appear (*this* is the component From 352c890d552d9657f7fededbaec424b655c3ad76 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Tue, 11 Jun 2024 13:31:57 +0300 Subject: [PATCH 5/5] Test tree truncation some more --- core/node/metadata_calculator/src/tests.rs | 120 ++++++++++++++++--- core/node/metadata_calculator/src/updater.rs | 2 +- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/core/node/metadata_calculator/src/tests.rs b/core/node/metadata_calculator/src/tests.rs index 2b48652edde4..20a814630fa7 100644 --- a/core/node/metadata_calculator/src/tests.rs +++ b/core/node/metadata_calculator/src/tests.rs @@ -31,6 +31,7 @@ use super::{ }; use crate::helpers::{AsyncTree, Delayer}; +const POLL_INTERVAL: Duration = Duration::from_millis(50); const RUN_TIMEOUT: Duration = Duration::from_secs(30); async fn run_with_timeout(timeout: Duration, action: F) -> T @@ -49,7 +50,7 @@ pub(super) fn mock_config(db_path: &Path) -> MetadataCalculatorConfig { db_path: db_path.to_str().unwrap().to_owned(), max_open_files: None, mode: MerkleTreeMode::Full, - delay_interval: Duration::from_millis(100), + delay_interval: POLL_INTERVAL, max_l1_batches_per_iter: 10, multi_get_chunk_size: 500, block_cache_capacity: 0, @@ -91,13 +92,9 @@ async fn low_level_genesis_creation() { let db = RocksDB::new(temp_dir.path()).unwrap(); let mut tree = AsyncTree::new(db.into(), MerkleTreeMode::Lightweight).unwrap(); let (_stop_sender, mut stop_receiver) = watch::channel(false); - tree.ensure_consistency( - &Delayer::new(Duration::from_millis(10)), - &pool, - &mut stop_receiver, - ) - .await - .unwrap(); + tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver) + .await + .unwrap(); assert!(!tree.is_empty()); assert_eq!(tree.next_l1_batch_number(), L1BatchNumber(1)); @@ -151,16 +148,79 @@ async fn tree_truncation_on_l1_batch_divergence( ); let (_stop_sender, mut stop_receiver) = watch::channel(false); - tree.ensure_consistency( - &Delayer::new(Duration::from_millis(10)), - &pool, - &mut stop_receiver, - ) - .await - .unwrap(); + tree.ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver) + .await + .unwrap(); assert_eq!(tree.next_l1_batch_number(), last_common_l1_batch + 1); } +#[test_casing(4, [1, 4, 6, 7])] +#[tokio::test] +async fn tree_truncation_on_l1_batch_divergence_in_pruned_tree(retained_l1_batch: u32) { + const INITIAL_BATCH_COUNT: usize = 10; + const LAST_COMMON_L1_BATCH: L1BatchNumber = L1BatchNumber(6); + + let retained_l1_batch = L1BatchNumber(retained_l1_batch); + + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + reset_db_state(&pool, INITIAL_BATCH_COUNT).await; + run_calculator(calculator).await; + + let mut storage = pool.connection().await.unwrap(); + remove_l1_batches(&mut storage, LAST_COMMON_L1_BATCH).await; + // Extend the state with new L1 batches. + let logs = gen_storage_logs(100..200, 5); + extend_db_state(&mut storage, logs).await; + + for number in (LAST_COMMON_L1_BATCH.0 + 1)..(LAST_COMMON_L1_BATCH.0 + 6) { + let new_tree_data = L1BatchTreeData { + hash: H256::from_low_u64_be(number.into()), + rollup_last_leaf_index: 200, // doesn't matter + }; + storage + .blocks_dal() + .save_l1_batch_tree_data(L1BatchNumber(number), &new_tree_data) + .await + .unwrap(); + } + + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let tree = calculator.create_tree().await.unwrap(); + let GenericAsyncTree::Ready(mut tree) = tree else { + panic!("Unexpected tree state: {tree:?}"); + }; + + let reader = tree.reader(); + let (mut pruner, pruner_handle) = tree.pruner(); + pruner.set_poll_interval(POLL_INTERVAL); + tokio::task::spawn_blocking(|| pruner.run()); + pruner_handle + .set_target_retained_version(retained_l1_batch.0.into()) + .unwrap(); + // Wait until the tree is pruned + while reader.clone().info().await.min_l1_batch_number < Some(retained_l1_batch) { + tokio::time::sleep(POLL_INTERVAL).await; + } + + let (_stop_sender, mut stop_receiver) = watch::channel(false); + let consistency_result = tree + .ensure_consistency(&Delayer::new(POLL_INTERVAL), &pool, &mut stop_receiver) + .await; + + if retained_l1_batch <= LAST_COMMON_L1_BATCH { + consistency_result.unwrap(); + assert_eq!(tree.next_l1_batch_number(), LAST_COMMON_L1_BATCH + 1); + } else { + let err = consistency_result.unwrap_err(); + assert!( + format!("{err:#}").contains("diverging min L1 batch"), + "{err:#}" + ); + } +} + #[tokio::test] async fn basic_workflow() { let pool = ConnectionPool::::test_pool().await; @@ -366,7 +426,7 @@ async fn shutting_down_calculator() { let (stop_sx, stop_rx) = watch::channel(false); let calculator_task = tokio::spawn(calculator.run(stop_rx)); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(POLL_INTERVAL).await; stop_sx.send_replace(true); run_with_timeout(RUN_TIMEOUT, calculator_task) .await @@ -429,7 +489,7 @@ async fn test_postgres_backup_recovery( insert_initial_writes_for_batch(&mut txn, batch_header.number).await; txn.commit().await.unwrap(); if sleep_between_batches { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(POLL_INTERVAL).await; } } drop(storage); @@ -844,3 +904,29 @@ async fn deduplication_works_as_expected() { assert_eq!(initial_writes[key].0, L1BatchNumber(4)); } } + +#[test_casing(3, [3, 5, 8])] +#[tokio::test] +async fn l1_batch_divergence_entire_workflow(last_common_l1_batch: u32) { + const INITIAL_BATCH_COUNT: usize = 10; + + assert!((last_common_l1_batch as usize) < INITIAL_BATCH_COUNT); + let last_common_l1_batch = L1BatchNumber(last_common_l1_batch); + + let pool = ConnectionPool::::test_pool().await; + let temp_dir = TempDir::new().expect("failed get temporary directory for RocksDB"); + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + reset_db_state(&pool, INITIAL_BATCH_COUNT).await; + run_calculator(calculator).await; + + let mut storage = pool.connection().await.unwrap(); + remove_l1_batches(&mut storage, last_common_l1_batch).await; + // Extend the state with new L1 batches. + let logs = gen_storage_logs(100..200, 5); + extend_db_state(&mut storage, logs).await; + let expected_root_hash = expected_tree_hash(&pool).await; + + let calculator = setup_lightweight_calculator(temp_dir.path(), pool.clone()).await; + let final_root_hash = run_calculator(calculator).await; + assert_eq!(final_root_hash, expected_root_hash); +} diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 2850b1d06126..94aa176e87de 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -307,7 +307,7 @@ impl AsyncTree { anyhow::ensure!( self.l1_batch_matches(&mut storage, min_tree_l1_batch).await?, - "Diverging min L1 batch in the tree #{min_tree_l1_batch}; the tree cannot recover from this" + "diverging min L1 batch in the tree #{min_tree_l1_batch}; the tree cannot recover from this" ); let mut left = min_tree_l1_batch.0;