From 146e4cf2f8d890ff0a8d33229e224442e14be437 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Fri, 17 Nov 2023 13:46:19 +0200 Subject: [PATCH] feat(merkle tree): Allow random-order tree recovery (#485) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Allow recovery the Merkle tree from entries provided in an arbitrary order. ## Why ❔ This is necessary to implement the snapshot recovery PoC and could be beneficial in the long run. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/merkle_tree/examples/recovery.rs | 29 ++++++-- core/lib/merkle_tree/src/recovery.rs | 31 +++++++-- core/lib/merkle_tree/src/storage/mod.rs | 28 +++++++- core/lib/merkle_tree/src/storage/tests.rs | 66 +++++++++++++++---- .../merkle_tree/tests/integration/recovery.rs | 43 +++++++++--- prover/Cargo.lock | 2 - 6 files changed, 162 insertions(+), 37 deletions(-) diff --git a/core/lib/merkle_tree/examples/recovery.rs b/core/lib/merkle_tree/examples/recovery.rs index 207499da8b41..af16ed05baf3 100644 --- a/core/lib/merkle_tree/examples/recovery.rs +++ b/core/lib/merkle_tree/examples/recovery.rs @@ -23,6 +23,9 @@ struct Cli { /// Number of entries per update. #[arg(name = "ops")] writes_per_update: usize, + /// Perform random recovery instead of linear recovery. + #[arg(name = "random", long)] + random: bool, /// Use a no-op hashing function. #[arg(name = "no-hash", long)] no_hashing: bool, @@ -89,17 +92,29 @@ impl Cli { let started_at = Instant::now(); let recovery_entries = (0..self.writes_per_update) .map(|_| { - last_key += key_step - Key::from(rng.gen::()); - // ^ Increases the key by a random increment close to `key` step with some randomness. last_leaf_index += 1; - RecoveryEntry { - key: last_key, - value: ValueHash::zero(), - leaf_index: last_leaf_index, + if self.random { + RecoveryEntry { + key: Key::from(rng.gen::<[u8; 32]>()), + value: ValueHash::zero(), + leaf_index: last_leaf_index, + } + } else { + last_key += key_step - Key::from(rng.gen::()); + // ^ Increases the key by a random increment close to `key` step with some randomness. + RecoveryEntry { + key: last_key, + value: ValueHash::zero(), + leaf_index: last_leaf_index, + } } }) .collect(); - recovery.extend(recovery_entries); + if self.random { + recovery.extend_random(recovery_entries); + } else { + recovery.extend_linear(recovery_entries); + } tracing::info!( "Updated tree with recovery chunk #{updated_idx} in {:?}", started_at.elapsed() diff --git a/core/lib/merkle_tree/src/recovery.rs b/core/lib/merkle_tree/src/recovery.rs index 9700e401fa27..6f57b64ee81f 100644 --- a/core/lib/merkle_tree/src/recovery.rs +++ b/core/lib/merkle_tree/src/recovery.rs @@ -137,7 +137,7 @@ impl MerkleTreeRecovery { storage.greatest_key() } - /// Extends a tree with a chunk of entries. + /// Extends a tree with a chunk of linearly ordered entries. /// /// Entries must be ordered by increasing `key`, and the key of the first entry must be greater /// than [`Self::last_processed_key()`]. @@ -154,12 +154,35 @@ impl MerkleTreeRecovery { %entries.key_range = entries_key_range(&entries), ), )] - pub fn extend(&mut self, entries: Vec) { + pub fn extend_linear(&mut self, entries: Vec) { tracing::debug!("Started extending tree"); let started_at = Instant::now(); let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false); - let patch = storage.extend_during_recovery(entries); + let patch = storage.extend_during_linear_recovery(entries); + tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed()); + + let started_at = Instant::now(); + self.db.apply_patch(patch); + tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed()); + } + + /// Extends a tree with a chunk of entries. Unlike [`Self::extend_linear()`], entries may be + /// ordered in any way you like. + #[tracing::instrument( + level = "debug", + skip_all, + fields( + recovered_version = self.recovered_version, + entries.len = entries.len(), + ), + )] + pub fn extend_random(&mut self, entries: Vec) { + tracing::debug!("Started extending tree"); + + let started_at = Instant::now(); + let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false); + let patch = storage.extend_during_random_recovery(entries); tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed()); let started_at = Instant::now(); @@ -262,7 +285,7 @@ mod tests { value: ValueHash::repeat_byte(1), leaf_index: 1, }; - recovery.extend(vec![recovery_entry]); + recovery.extend_linear(vec![recovery_entry]); let tree = recovery.finalize(); assert_eq!(tree.latest_version(), Some(42)); diff --git a/core/lib/merkle_tree/src/storage/mod.rs b/core/lib/merkle_tree/src/storage/mod.rs index baea778cf93f..c5a56abfca90 100644 --- a/core/lib/merkle_tree/src/storage/mod.rs +++ b/core/lib/merkle_tree/src/storage/mod.rs @@ -321,7 +321,10 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> { Some(self.updater.load_greatest_key(self.db)?.0.full_key) } - pub fn extend_during_recovery(mut self, recovery_entries: Vec) -> PatchSet { + pub fn extend_during_linear_recovery( + mut self, + recovery_entries: Vec, + ) -> PatchSet { let (mut prev_key, mut prev_nibbles) = match self.updater.load_greatest_key(self.db) { Some((leaf, nibbles)) => (Some(leaf.full_key), nibbles), None => (None, Nibbles::EMPTY), @@ -353,6 +356,29 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> { patch } + pub fn extend_during_random_recovery( + mut self, + recovery_entries: Vec, + ) -> PatchSet { + let load_nodes_latency = BLOCK_TIMINGS.load_nodes.start(); + let sorted_keys = SortedKeys::new(recovery_entries.iter().map(|entry| entry.key)); + let parent_nibbles = self.updater.load_ancestors(&sorted_keys, self.db); + let load_nodes_latency = load_nodes_latency.observe(); + tracing::debug!("Load stage took {load_nodes_latency:?}"); + + let extend_patch_latency = BLOCK_TIMINGS.extend_patch.start(); + for (entry, parent_nibbles) in recovery_entries.into_iter().zip(parent_nibbles) { + self.updater + .insert(entry.key, entry.value, &parent_nibbles, || entry.leaf_index); + self.leaf_count += 1; + } + let extend_patch_latency = extend_patch_latency.observe(); + tracing::debug!("Tree traversal stage took {extend_patch_latency:?}"); + + let (_, patch) = self.finalize(); + patch + } + fn finalize(self) -> (ValueHash, PatchSet) { tracing::debug!( "Finished updating tree; total leaf count: {}, stats: {:?}", diff --git a/core/lib/merkle_tree/src/storage/tests.rs b/core/lib/merkle_tree/src/storage/tests.rs index d00ed4d3e05a..3ed0cbada52f 100644 --- a/core/lib/merkle_tree/src/storage/tests.rs +++ b/core/lib/merkle_tree/src/storage/tests.rs @@ -510,7 +510,7 @@ fn recovery_flattens_node_versions() { leaf_index: i + 1, }); let patch = Storage::new(&PatchSet::default(), &(), recovery_version, false) - .extend_during_recovery(recovery_entries.collect()); + .extend_during_linear_recovery(recovery_entries.collect()); assert_eq!(patch.patches_by_version.len(), 1); let (updated_version, patch) = patch.patches_by_version.into_iter().next().unwrap(); assert_eq!(updated_version, recovery_version); @@ -546,7 +546,7 @@ fn test_recovery_with_node_hierarchy(chunk_size: usize) { let mut db = PatchSet::default(); for recovery_chunk in recovery_entries.chunks(chunk_size) { let patch = Storage::new(&db, &(), recovery_version, false) - .extend_during_recovery(recovery_chunk.to_vec()); + .extend_during_linear_recovery(recovery_chunk.to_vec()); db.apply_patch(patch); } assert_eq!(db.updated_version, Some(recovery_version)); @@ -605,7 +605,7 @@ fn test_recovery_with_deep_node_hierarchy(chunk_size: usize) { let mut db = PatchSet::default(); for recovery_chunk in recovery_entries.chunks(chunk_size) { let patch = Storage::new(&db, &(), recovery_version, false) - .extend_during_recovery(recovery_chunk.to_vec()); + .extend_during_linear_recovery(recovery_chunk.to_vec()); db.apply_patch(patch); } let mut patch = db.patches_by_version.remove(&recovery_version).unwrap(); @@ -673,7 +673,7 @@ fn recovery_workflow_with_multiple_stages() { leaf_index: i, }); let patch = Storage::new(&db, &(), recovery_version, false) - .extend_during_recovery(recovery_entries.collect()); + .extend_during_linear_recovery(recovery_entries.collect()); assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 100); db.apply_patch(patch); @@ -684,7 +684,7 @@ fn recovery_workflow_with_multiple_stages() { }); let patch = Storage::new(&db, &(), recovery_version, false) - .extend_during_recovery(more_recovery_entries.collect()); + .extend_during_linear_recovery(more_recovery_entries.collect()); assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 200); db.apply_patch(patch); @@ -701,6 +701,7 @@ fn recovery_workflow_with_multiple_stages() { } fn test_recovery_pruning_equivalence( + is_linear: bool, chunk_size: usize, recovery_chunk_size: usize, hasher: &dyn HashTree, @@ -752,13 +753,21 @@ fn test_recovery_pruning_equivalence( }); let mut recovery_entries: Vec<_> = recovery_entries.collect(); assert_eq!(recovery_entries.len(), 100); - recovery_entries.sort_unstable_by_key(|entry| entry.key); + if is_linear { + recovery_entries.sort_unstable_by_key(|entry| entry.key); + } else { + recovery_entries.shuffle(&mut rng); + } // Recover the tree. let mut recovered_db = PatchSet::default(); for recovery_chunk in recovery_entries.chunks(recovery_chunk_size) { - let patch = Storage::new(&recovered_db, hasher, recovered_version, false) - .extend_during_recovery(recovery_chunk.to_vec()); + let storage = Storage::new(&recovered_db, hasher, recovered_version, false); + let patch = if is_linear { + storage.extend_during_linear_recovery(recovery_chunk.to_vec()) + } else { + storage.extend_during_random_recovery(recovery_chunk.to_vec()) + }; recovered_db.apply_patch(patch); } let sub_patch = recovered_db @@ -798,25 +807,54 @@ fn test_recovery_pruning_equivalence( } #[test] -fn recovery_pruning_equivalence() { +fn linear_recovery_pruning_equivalence() { + for chunk_size in [3, 5, 7, 11, 21, 42, 99, 100] { + // No chunking during recovery (simple case). + test_recovery_pruning_equivalence(true, chunk_size, 100, &()); + // Recovery is chunked (more complex case). + for recovery_chunk_size in [chunk_size, 1, 6, 19, 50, 73] { + test_recovery_pruning_equivalence(true, chunk_size, recovery_chunk_size, &()); + } + } +} + +#[test] +fn random_recovery_pruning_equivalence() { for chunk_size in [3, 5, 7, 11, 21, 42, 99, 100] { // No chunking during recovery (simple case). - test_recovery_pruning_equivalence(chunk_size, 100, &()); + test_recovery_pruning_equivalence(false, chunk_size, 100, &()); // Recovery is chunked (more complex case). for recovery_chunk_size in [chunk_size, 1, 6, 19, 50, 73] { - test_recovery_pruning_equivalence(chunk_size, recovery_chunk_size, &()); + test_recovery_pruning_equivalence(false, chunk_size, recovery_chunk_size, &()); } } } #[test] -fn recovery_pruning_equivalence_with_hashing() { +fn linear_recovery_pruning_equivalence_with_hashing() { for chunk_size in [3, 7, 21, 42, 100] { // No chunking during recovery (simple case). - test_recovery_pruning_equivalence(chunk_size, 100, &Blake2Hasher); + test_recovery_pruning_equivalence(true, chunk_size, 100, &Blake2Hasher); // Recovery is chunked (more complex case). for recovery_chunk_size in [chunk_size, 1, 19, 73] { - test_recovery_pruning_equivalence(chunk_size, recovery_chunk_size, &Blake2Hasher); + test_recovery_pruning_equivalence(true, chunk_size, recovery_chunk_size, &Blake2Hasher); + } + } +} + +#[test] +fn random_recovery_pruning_equivalence_with_hashing() { + for chunk_size in [3, 7, 21, 42, 100] { + // No chunking during recovery (simple case). + test_recovery_pruning_equivalence(false, chunk_size, 100, &Blake2Hasher); + // Recovery is chunked (more complex case). + for recovery_chunk_size in [chunk_size, 1, 19, 73] { + test_recovery_pruning_equivalence( + false, + chunk_size, + recovery_chunk_size, + &Blake2Hasher, + ); } } } diff --git a/core/lib/merkle_tree/tests/integration/recovery.rs b/core/lib/merkle_tree/tests/integration/recovery.rs index fe89dded5c32..9a1cfee95911 100644 --- a/core/lib/merkle_tree/tests/integration/recovery.rs +++ b/core/lib/merkle_tree/tests/integration/recovery.rs @@ -27,7 +27,7 @@ fn recovery_basics() { let recovered_version = 123; let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), recovered_version); - recovery.extend(recovery_entries); + recovery.extend_linear(recovery_entries); assert_eq!(recovery.last_processed_key(), Some(greatest_key)); assert_eq!(recovery.root_hash(), *expected_hash); @@ -36,7 +36,7 @@ fn recovery_basics() { tree.verify_consistency(recovered_version).unwrap(); } -fn test_recovery_in_chunks(mut create_db: impl FnMut() -> DB) { +fn test_recovery_in_chunks(is_linear: bool, mut create_db: impl FnMut() -> DB) { let (kvs, expected_hash) = &*KVS_AND_HASH; let recovery_entries = kvs .iter() @@ -47,15 +47,25 @@ fn test_recovery_in_chunks(mut create_db: impl FnMut() -> DB) leaf_index: i as u64 + 1, }); let mut recovery_entries: Vec<_> = recovery_entries.collect(); - recovery_entries.sort_unstable_by_key(|entry| entry.key); - let greatest_key = recovery_entries[99].key; + if is_linear { + recovery_entries.sort_unstable_by_key(|entry| entry.key); + } + let greatest_key = recovery_entries + .iter() + .map(|entry| entry.key) + .max() + .unwrap(); let recovered_version = 123; for chunk_size in [6, 10, 17, 42] { let mut db = create_db(); let mut recovery = MerkleTreeRecovery::new(&mut db, recovered_version); for (i, chunk) in recovery_entries.chunks(chunk_size).enumerate() { - recovery.extend(chunk.to_vec()); + if is_linear { + recovery.extend_linear(chunk.to_vec()); + } else { + recovery.extend_random(chunk.to_vec()); + } if i % 3 == 1 { recovery = MerkleTreeRecovery::new(&mut db, recovered_version); // ^ Simulate recovery interruption and restart @@ -119,8 +129,13 @@ fn test_tree_after_recovery( } #[test] -fn recovery_in_chunks() { - test_recovery_in_chunks(PatchSet::default); +fn linear_recovery_in_chunks() { + test_recovery_in_chunks(true, PatchSet::default); +} + +#[test] +fn random_recovery_in_chunks() { + test_recovery_in_chunks(false, PatchSet::default); } mod rocksdb { @@ -130,10 +145,20 @@ mod rocksdb { use zksync_merkle_tree::RocksDBWrapper; #[test] - fn recovery_in_chunks() { + fn linear_recovery_in_chunks() { + let temp_dir = TempDir::new().unwrap(); + let mut counter = 0; + test_recovery_in_chunks(true, || { + counter += 1; + RocksDBWrapper::new(&temp_dir.path().join(counter.to_string())) + }); + } + + #[test] + fn random_recovery_in_chunks() { let temp_dir = TempDir::new().unwrap(); let mut counter = 0; - test_recovery_in_chunks(|| { + test_recovery_in_chunks(false, || { counter += 1; RocksDBWrapper::new(&temp_dir.path().join(counter.to_string())) }); diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 38ea58ac4366..d100677d7469 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -6702,10 +6702,8 @@ name = "zksync_config" version = "0.1.0" dependencies = [ "anyhow", - "envy", "serde", "zksync_basic_types", - "zksync_contracts", ] [[package]]