Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(en): Support Merkle tree recovery with pruning enabled #3172

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions core/lib/dal/src/storage_logs_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ impl StorageLogsDal<'_, '_> {
FROM
storage_logs
WHERE
storage_logs.miniblock_number = $1
storage_logs.miniblock_number <= $1
AND storage_logs.hashed_key >= u.start_key
AND storage_logs.hashed_key <= u.end_key
ORDER BY
Expand Down Expand Up @@ -784,7 +784,7 @@ impl StorageLogsDal<'_, '_> {
storage_logs
INNER JOIN initial_writes ON storage_logs.hashed_key = initial_writes.hashed_key
WHERE
storage_logs.miniblock_number = $1
storage_logs.miniblock_number <= $1
AND storage_logs.hashed_key >= $2::bytea
AND storage_logs.hashed_key <= $3::bytea
ORDER BY
Expand Down
1 change: 1 addition & 0 deletions core/node/metadata_calculator/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ mod tests {
extend_db_state_from_l1_batch(
&mut storage,
snapshot_recovery.l1_batch_number + 1,
snapshot_recovery.l2_block_number + 1,
new_logs,
)
.await;
Expand Down
156 changes: 101 additions & 55 deletions core/node/metadata_calculator/src/recovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ use std::{
};

use anyhow::Context as _;
use async_trait::async_trait;
use futures::future;
use tokio::sync::{watch, Mutex, Semaphore};
use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_health_check::HealthUpdater;
use zksync_merkle_tree::TreeEntry;
use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_types::{
snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus},
L2BlockNumber, H256,
};
use zksync_types::{snapshots::uniform_hashed_keys_chunk, L1BatchNumber, L2BlockNumber, H256};

use super::{
helpers::{AsyncTree, AsyncTreeRecovery, GenericAsyncTree, MerkleTreeHealth},
Expand All @@ -54,12 +52,13 @@ mod tests;

/// Handler of recovery life cycle events. This functionality is encapsulated in a trait to be able
/// to control recovery behavior in tests.
#[async_trait]
trait HandleRecoveryEvent: fmt::Debug + Send + Sync {
fn recovery_started(&mut self, _chunk_count: u64, _recovered_chunk_count: u64) {
// Default implementation does nothing
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
// Default implementation does nothing
}
}
Expand All @@ -82,6 +81,7 @@ impl<'a> RecoveryHealthUpdater<'a> {
}
}

#[async_trait]
impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
fn recovery_started(&mut self, chunk_count: u64, recovered_chunk_count: u64) {
self.chunk_count = chunk_count;
Expand All @@ -91,7 +91,7 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
.set(recovered_chunk_count);
}

fn chunk_recovered(&self) {
async fn chunk_recovered(&self) {
let recovered_chunk_count = self.recovered_chunk_count.fetch_add(1, Ordering::SeqCst) + 1;
let chunks_left = self.chunk_count.saturating_sub(recovered_chunk_count);
tracing::info!(
Expand All @@ -110,34 +110,68 @@ impl HandleRecoveryEvent for RecoveryHealthUpdater<'_> {
}

#[derive(Debug, Clone, Copy)]
struct SnapshotParameters {
struct InitParameters {
l1_batch: L1BatchNumber,
l2_block: L2BlockNumber,
expected_root_hash: H256,
expected_root_hash: Option<H256>,
log_count: u64,
desired_chunk_size: u64,
}

impl SnapshotParameters {
impl InitParameters {
async fn new(
pool: &ConnectionPool<Core>,
recovery: &SnapshotRecoveryStatus,
config: &MetadataCalculatorRecoveryConfig,
) -> anyhow::Result<Self> {
let l2_block = recovery.l2_block_number;
let expected_root_hash = recovery.l1_batch_root_hash;

) -> anyhow::Result<Option<Self>> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
let recovery_status = storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?;
let pruning_info = storage.pruning_dal().get_pruning_info().await?;

let (l1_batch, l2_block);
let mut expected_root_hash = None;
match (recovery_status, pruning_info.last_hard_pruned_l2_block) {
(Some(recovery), None) => {
tracing::warn!(
"Snapshot recovery {recovery:?} is present on the node, but pruning info is empty; assuming no pruning happened"
);
l1_batch = recovery.l1_batch_number;
l2_block = recovery.l2_block_number;
expected_root_hash = Some(recovery.l1_batch_root_hash);
}
(Some(recovery), Some(pruned_l2_block)) => {
// We have both recovery and some pruning on top of it.
l2_block = pruned_l2_block.max(recovery.l2_block_number);
l1_batch = pruning_info
.last_hard_pruned_l1_batch
.with_context(|| format!("malformed pruning info: {pruning_info:?}"))?;
if l1_batch == recovery.l1_batch_number {
expected_root_hash = Some(recovery.l1_batch_root_hash);
}
}
(None, Some(pruned_l2_block)) => {
l2_block = pruned_l2_block;
l1_batch = pruning_info
.last_hard_pruned_l1_batch
.with_context(|| format!("malformed pruning info: {pruning_info:?}"))?;
}
(None, None) => return Ok(None),
};

let log_count = storage
.storage_logs_dal()
.get_storage_logs_row_count(l2_block)
.await?;

Ok(Self {
Ok(Some(Self {
l1_batch,
l2_block,
expected_root_hash,
log_count,
desired_chunk_size: config.desired_chunk_size,
})
}))
}

fn chunk_count(&self) -> u64 {
Expand Down Expand Up @@ -168,47 +202,44 @@ impl GenericAsyncTree {
stop_receiver: &watch::Receiver<bool>,
) -> anyhow::Result<Option<AsyncTree>> {
let started_at = Instant::now();
let (tree, snapshot_recovery) = match self {
let (tree, init_params) = match self {
Self::Ready(tree) => return Ok(Some(tree)),
Self::Recovering(tree) => {
let snapshot_recovery = get_snapshot_recovery(main_pool).await?.context(
let params = InitParameters::new(main_pool, config).await?.context(
"Merkle tree is recovering, but Postgres doesn't contain snapshot recovery information",
)?;
let recovered_version = tree.recovered_version();
anyhow::ensure!(
u64::from(snapshot_recovery.l1_batch_number.0) == recovered_version,
"Snapshot L1 batch in Postgres ({snapshot_recovery:?}) differs from the recovered Merkle tree version \
u64::from(params.l1_batch.0) == recovered_version,
"Snapshot L1 batch in Postgres ({params:?}) differs from the recovered Merkle tree version \
({recovered_version})"
);
tracing::info!("Resuming tree recovery with status: {snapshot_recovery:?}");
(tree, snapshot_recovery)
tracing::info!("Resuming tree recovery with status: {params:?}");
(tree, params)
}
Self::Empty { db, mode } => {
if let Some(snapshot_recovery) = get_snapshot_recovery(main_pool).await? {
tracing::info!(
"Starting Merkle tree recovery with status {snapshot_recovery:?}"
);
let l1_batch = snapshot_recovery.l1_batch_number;
if let Some(params) = InitParameters::new(main_pool, config).await? {
tracing::info!("Starting Merkle tree recovery with status {params:?}");
let l1_batch = params.l1_batch;
let tree = AsyncTreeRecovery::new(db, l1_batch.0.into(), mode, config)?;
(tree, snapshot_recovery)
(tree, params)
} else {
// Start the tree from scratch. The genesis block will be filled in `TreeUpdater::loop_updating_tree()`.
return Ok(Some(AsyncTree::new(db, mode)?));
}
}
};

let snapshot = SnapshotParameters::new(main_pool, &snapshot_recovery, config).await?;
tracing::debug!(
"Obtained snapshot parameters: {snapshot:?} based on recovery configuration {config:?}"
"Obtained recovery init parameters: {init_params:?} based on recovery configuration {config:?}"
);
let recovery_options = RecoveryOptions {
chunk_count: snapshot.chunk_count(),
chunk_count: init_params.chunk_count(),
concurrency_limit: recovery_pool.max_size() as usize,
events: Box::new(RecoveryHealthUpdater::new(health_updater)),
};
let tree = tree
.recover(snapshot, recovery_options, &recovery_pool, stop_receiver)
.recover(init_params, recovery_options, &recovery_pool, stop_receiver)
.await?;
if tree.is_some() {
// Only report latency if recovery wasn't canceled
Expand All @@ -223,12 +254,12 @@ impl GenericAsyncTree {
impl AsyncTreeRecovery {
async fn recover(
mut self,
snapshot: SnapshotParameters,
init_params: InitParameters,
mut options: RecoveryOptions<'_>,
pool: &ConnectionPool<Core>,
stop_receiver: &watch::Receiver<bool>,
) -> anyhow::Result<Option<AsyncTree>> {
self.ensure_desired_chunk_size(snapshot.desired_chunk_size)
self.ensure_desired_chunk_size(init_params.desired_chunk_size)
.await?;

let start_time = Instant::now();
Expand All @@ -237,13 +268,15 @@ impl AsyncTreeRecovery {
.map(|chunk_id| uniform_hashed_keys_chunk(chunk_id, chunk_count))
.collect();
tracing::info!(
"Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}",
"Recovering Merkle tree from Postgres snapshot in {chunk_count} chunks with max concurrency {}. \
Be aware that enabling node pruning during recovery will probably result in a recovery error; always disable pruning \
until recovery is complete",
options.concurrency_limit
);

let mut storage = pool.connection_tagged("metadata_calculator").await?;
let remaining_chunks = self
.filter_chunks(&mut storage, snapshot.l2_block, &chunks)
.filter_chunks(&mut storage, init_params.l2_block, &chunks)
.await?;
drop(storage);
options
Expand All @@ -261,9 +294,10 @@ impl AsyncTreeRecovery {
.acquire()
.await
.context("semaphore is never closed")?;
if Self::recover_key_chunk(&tree, snapshot.l2_block, chunk, pool, stop_receiver).await?
if Self::recover_key_chunk(&tree, init_params.l2_block, chunk, pool, stop_receiver)
.await?
{
options.events.chunk_recovered();
options.events.chunk_recovered().await;
}
anyhow::Ok(())
});
Expand All @@ -279,13 +313,18 @@ impl AsyncTreeRecovery {

let finalize_latency = RECOVERY_METRICS.latency[&RecoveryStage::Finalize].start();
let actual_root_hash = tree.root_hash().await;
anyhow::ensure!(
actual_root_hash == snapshot.expected_root_hash,
"Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {:?}. \
If pruning is enabled and the tree is initialized some time after node recovery, \
this is caused by snapshot storage logs getting pruned; this setup is currently not supported",
snapshot.expected_root_hash
);
if let Some(expected_root_hash) = init_params.expected_root_hash {
slowli marked this conversation as resolved.
Show resolved Hide resolved
anyhow::ensure!(
actual_root_hash == expected_root_hash,
"Root hash of recovered tree {actual_root_hash:?} differs from expected root hash {expected_root_hash:?}"
);
}

// Check pruning info one last time before finalizing the tree.
let mut storage = pool.connection_tagged("metadata_calculator").await?;
Self::check_pruning_info(&mut storage, init_params.l2_block).await?;
drop(storage);

let tree = tree.finalize().await?;
finalize_latency.observe();
tracing::info!(
Expand Down Expand Up @@ -340,6 +379,21 @@ impl AsyncTreeRecovery {
Ok(output)
}

async fn check_pruning_info(
slowli marked this conversation as resolved.
Show resolved Hide resolved
storage: &mut Connection<'_, Core>,
snapshot_l2_block: L2BlockNumber,
) -> anyhow::Result<()> {
let pruning_info = storage.pruning_dal().get_pruning_info().await?;
if let Some(last_hard_pruned_l2_block) = pruning_info.last_hard_pruned_l2_block {
anyhow::ensure!(
last_hard_pruned_l2_block == snapshot_l2_block,
"Additional data was pruned compared to tree recovery L2 block #{snapshot_l2_block}: {pruning_info:?}. \
Continuing recovery is impossible; to recover the tree, drop its RocksDB directory, stop pruning and restart recovery"
);
}
Ok(())
}

/// Returns `Ok(true)` if the chunk was recovered, `Ok(false)` if the recovery process was interrupted.
async fn recover_key_chunk(
tree: &Mutex<AsyncTreeRecovery>,
Expand All @@ -363,7 +417,9 @@ impl AsyncTreeRecovery {
.storage_logs_dal()
.get_tree_entries_for_l2_block(snapshot_l2_block, key_chunk.clone())
.await?;
Self::check_pruning_info(&mut storage, snapshot_l2_block).await?;
drop(storage);

let entries_latency = entries_latency.observe();
tracing::debug!(
"Loaded {} entries for chunk {key_chunk:?} in {entries_latency:?}",
Expand Down Expand Up @@ -414,13 +470,3 @@ impl AsyncTreeRecovery {
Ok(true)
}
}

async fn get_snapshot_recovery(
pool: &ConnectionPool<Core>,
) -> anyhow::Result<Option<SnapshotRecoveryStatus>> {
let mut storage = pool.connection_tagged("metadata_calculator").await?;
Ok(storage
.snapshot_recovery_dal()
.get_applied_snapshot_status()
.await?)
}
Loading
Loading