Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(merkle tree): Allow random-order tree recovery #485

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions core/lib/merkle_tree/examples/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ struct Cli {
/// Number of entries per update.
#[arg(name = "ops")]
writes_per_update: usize,
/// Perform random recovery instead of linear recovery.
#[arg(name = "random", long)]
random: bool,
/// Use a no-op hashing function.
#[arg(name = "no-hash", long)]
no_hashing: bool,
Expand Down Expand Up @@ -89,17 +92,29 @@ impl Cli {
let started_at = Instant::now();
let recovery_entries = (0..self.writes_per_update)
.map(|_| {
last_key += key_step - Key::from(rng.gen::<u64>());
// ^ Increases the key by a random increment close to `key` step with some randomness.
last_leaf_index += 1;
RecoveryEntry {
key: last_key,
value: ValueHash::zero(),
leaf_index: last_leaf_index,
if self.random {
RecoveryEntry {
key: Key::from(rng.gen::<[u8; 32]>()),
value: ValueHash::zero(),
leaf_index: last_leaf_index,
}
} else {
last_key += key_step - Key::from(rng.gen::<u64>());
// ^ Increases the key by a random increment close to `key` step with some randomness.
RecoveryEntry {
key: last_key,
value: ValueHash::zero(),
leaf_index: last_leaf_index,
}
}
})
.collect();
recovery.extend(recovery_entries);
if self.random {
recovery.extend_random(recovery_entries);
} else {
recovery.extend_linear(recovery_entries);
}
tracing::info!(
"Updated tree with recovery chunk #{updated_idx} in {:?}",
started_at.elapsed()
Expand Down
31 changes: 27 additions & 4 deletions core/lib/merkle_tree/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
storage.greatest_key()
}

/// Extends a tree with a chunk of entries.
/// Extends a tree with a chunk of linearly ordered entries.
///
/// Entries must be ordered by increasing `key`, and the key of the first entry must be greater
/// than [`Self::last_processed_key()`].
Expand All @@ -154,12 +154,35 @@ impl<DB: PruneDatabase, H: HashTree> MerkleTreeRecovery<DB, H> {
%entries.key_range = entries_key_range(&entries),
),
)]
pub fn extend(&mut self, entries: Vec<RecoveryEntry>) {
pub fn extend_linear(&mut self, entries: Vec<RecoveryEntry>) {
tracing::debug!("Started extending tree");

let started_at = Instant::now();
let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false);
let patch = storage.extend_during_recovery(entries);
let patch = storage.extend_during_linear_recovery(entries);
tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed());

let started_at = Instant::now();
self.db.apply_patch(patch);
tracing::debug!("Finished persisting to DB; took {:?}", started_at.elapsed());
}

/// Extends a tree with a chunk of entries. Unlike [`Self::extend_linear()`], entries may be
/// ordered in any way you like.
#[tracing::instrument(
level = "debug",
skip_all,
fields(
recovered_version = self.recovered_version,
entries.len = entries.len(),
),
)]
pub fn extend_random(&mut self, entries: Vec<RecoveryEntry>) {
tracing::debug!("Started extending tree");

let started_at = Instant::now();
let storage = Storage::new(&self.db, &self.hasher, self.recovered_version, false);
let patch = storage.extend_during_random_recovery(entries);
tracing::debug!("Finished processing keys; took {:?}", started_at.elapsed());

let started_at = Instant::now();
Expand Down Expand Up @@ -262,7 +285,7 @@ mod tests {
value: ValueHash::repeat_byte(1),
leaf_index: 1,
};
recovery.extend(vec![recovery_entry]);
recovery.extend_linear(vec![recovery_entry]);
let tree = recovery.finalize();

assert_eq!(tree.latest_version(), Some(42));
Expand Down
28 changes: 27 additions & 1 deletion core/lib/merkle_tree/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,10 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
Some(self.updater.load_greatest_key(self.db)?.0.full_key)
}

pub fn extend_during_recovery(mut self, recovery_entries: Vec<RecoveryEntry>) -> PatchSet {
pub fn extend_during_linear_recovery(
mut self,
recovery_entries: Vec<RecoveryEntry>,
) -> PatchSet {
let (mut prev_key, mut prev_nibbles) = match self.updater.load_greatest_key(self.db) {
Some((leaf, nibbles)) => (Some(leaf.full_key), nibbles),
None => (None, Nibbles::EMPTY),
Expand Down Expand Up @@ -353,6 +356,29 @@ impl<'a, DB: Database + ?Sized> Storage<'a, DB> {
patch
}

pub fn extend_during_random_recovery(
mut self,
recovery_entries: Vec<RecoveryEntry>,
) -> PatchSet {
let load_nodes_latency = BLOCK_TIMINGS.load_nodes.start();
let sorted_keys = SortedKeys::new(recovery_entries.iter().map(|entry| entry.key));
let parent_nibbles = self.updater.load_ancestors(&sorted_keys, self.db);
let load_nodes_latency = load_nodes_latency.observe();
tracing::debug!("Load stage took {load_nodes_latency:?}");

let extend_patch_latency = BLOCK_TIMINGS.extend_patch.start();
for (entry, parent_nibbles) in recovery_entries.into_iter().zip(parent_nibbles) {
self.updater
.insert(entry.key, entry.value, &parent_nibbles, || entry.leaf_index);
self.leaf_count += 1;
}
let extend_patch_latency = extend_patch_latency.observe();
tracing::debug!("Tree traversal stage took {extend_patch_latency:?}");

let (_, patch) = self.finalize();
patch
}

fn finalize(self) -> (ValueHash, PatchSet) {
tracing::debug!(
"Finished updating tree; total leaf count: {}, stats: {:?}",
Expand Down
66 changes: 52 additions & 14 deletions core/lib/merkle_tree/src/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ fn recovery_flattens_node_versions() {
leaf_index: i + 1,
});
let patch = Storage::new(&PatchSet::default(), &(), recovery_version, false)
.extend_during_recovery(recovery_entries.collect());
.extend_during_linear_recovery(recovery_entries.collect());
assert_eq!(patch.patches_by_version.len(), 1);
let (updated_version, patch) = patch.patches_by_version.into_iter().next().unwrap();
assert_eq!(updated_version, recovery_version);
Expand Down Expand Up @@ -546,7 +546,7 @@ fn test_recovery_with_node_hierarchy(chunk_size: usize) {
let mut db = PatchSet::default();
for recovery_chunk in recovery_entries.chunks(chunk_size) {
let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(recovery_chunk.to_vec());
.extend_during_linear_recovery(recovery_chunk.to_vec());
db.apply_patch(patch);
}
assert_eq!(db.updated_version, Some(recovery_version));
Expand Down Expand Up @@ -605,7 +605,7 @@ fn test_recovery_with_deep_node_hierarchy(chunk_size: usize) {
let mut db = PatchSet::default();
for recovery_chunk in recovery_entries.chunks(chunk_size) {
let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(recovery_chunk.to_vec());
.extend_during_linear_recovery(recovery_chunk.to_vec());
db.apply_patch(patch);
}
let mut patch = db.patches_by_version.remove(&recovery_version).unwrap();
Expand Down Expand Up @@ -673,7 +673,7 @@ fn recovery_workflow_with_multiple_stages() {
leaf_index: i,
});
let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(recovery_entries.collect());
.extend_during_linear_recovery(recovery_entries.collect());
assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 100);
db.apply_patch(patch);

Expand All @@ -684,7 +684,7 @@ fn recovery_workflow_with_multiple_stages() {
});

let patch = Storage::new(&db, &(), recovery_version, false)
.extend_during_recovery(more_recovery_entries.collect());
.extend_during_linear_recovery(more_recovery_entries.collect());
assert_eq!(patch.root(recovery_version).unwrap().leaf_count(), 200);
db.apply_patch(patch);

Expand All @@ -701,6 +701,7 @@ fn recovery_workflow_with_multiple_stages() {
}

fn test_recovery_pruning_equivalence(
is_linear: bool,
chunk_size: usize,
recovery_chunk_size: usize,
hasher: &dyn HashTree,
Expand Down Expand Up @@ -752,13 +753,21 @@ fn test_recovery_pruning_equivalence(
});
let mut recovery_entries: Vec<_> = recovery_entries.collect();
assert_eq!(recovery_entries.len(), 100);
recovery_entries.sort_unstable_by_key(|entry| entry.key);
if is_linear {
recovery_entries.sort_unstable_by_key(|entry| entry.key);
} else {
recovery_entries.shuffle(&mut rng);
}

// Recover the tree.
let mut recovered_db = PatchSet::default();
for recovery_chunk in recovery_entries.chunks(recovery_chunk_size) {
let patch = Storage::new(&recovered_db, hasher, recovered_version, false)
.extend_during_recovery(recovery_chunk.to_vec());
let storage = Storage::new(&recovered_db, hasher, recovered_version, false);
let patch = if is_linear {
storage.extend_during_linear_recovery(recovery_chunk.to_vec())
} else {
storage.extend_during_random_recovery(recovery_chunk.to_vec())
};
recovered_db.apply_patch(patch);
}
let sub_patch = recovered_db
Expand Down Expand Up @@ -798,25 +807,54 @@ fn test_recovery_pruning_equivalence(
}

#[test]
fn recovery_pruning_equivalence() {
fn linear_recovery_pruning_equivalence() {
for chunk_size in [3, 5, 7, 11, 21, 42, 99, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(true, chunk_size, 100, &());
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 6, 19, 50, 73] {
test_recovery_pruning_equivalence(true, chunk_size, recovery_chunk_size, &());
}
}
}

#[test]
fn random_recovery_pruning_equivalence() {
for chunk_size in [3, 5, 7, 11, 21, 42, 99, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(chunk_size, 100, &());
test_recovery_pruning_equivalence(false, chunk_size, 100, &());
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 6, 19, 50, 73] {
test_recovery_pruning_equivalence(chunk_size, recovery_chunk_size, &());
test_recovery_pruning_equivalence(false, chunk_size, recovery_chunk_size, &());
}
}
}

#[test]
fn recovery_pruning_equivalence_with_hashing() {
fn linear_recovery_pruning_equivalence_with_hashing() {
for chunk_size in [3, 7, 21, 42, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(chunk_size, 100, &Blake2Hasher);
test_recovery_pruning_equivalence(true, chunk_size, 100, &Blake2Hasher);
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 19, 73] {
test_recovery_pruning_equivalence(chunk_size, recovery_chunk_size, &Blake2Hasher);
test_recovery_pruning_equivalence(true, chunk_size, recovery_chunk_size, &Blake2Hasher);
}
}
}

#[test]
fn random_recovery_pruning_equivalence_with_hashing() {
for chunk_size in [3, 7, 21, 42, 100] {
// No chunking during recovery (simple case).
test_recovery_pruning_equivalence(false, chunk_size, 100, &Blake2Hasher);
// Recovery is chunked (more complex case).
for recovery_chunk_size in [chunk_size, 1, 19, 73] {
test_recovery_pruning_equivalence(
false,
chunk_size,
recovery_chunk_size,
&Blake2Hasher,
);
}
}
}
43 changes: 34 additions & 9 deletions core/lib/merkle_tree/tests/integration/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn recovery_basics() {

let recovered_version = 123;
let mut recovery = MerkleTreeRecovery::new(PatchSet::default(), recovered_version);
recovery.extend(recovery_entries);
recovery.extend_linear(recovery_entries);

assert_eq!(recovery.last_processed_key(), Some(greatest_key));
assert_eq!(recovery.root_hash(), *expected_hash);
Expand All @@ -36,7 +36,7 @@ fn recovery_basics() {
tree.verify_consistency(recovered_version).unwrap();
}

fn test_recovery_in_chunks<DB: PruneDatabase>(mut create_db: impl FnMut() -> DB) {
fn test_recovery_in_chunks<DB: PruneDatabase>(is_linear: bool, mut create_db: impl FnMut() -> DB) {
let (kvs, expected_hash) = &*KVS_AND_HASH;
let recovery_entries = kvs
.iter()
Expand All @@ -47,15 +47,25 @@ fn test_recovery_in_chunks<DB: PruneDatabase>(mut create_db: impl FnMut() -> DB)
leaf_index: i as u64 + 1,
});
let mut recovery_entries: Vec<_> = recovery_entries.collect();
recovery_entries.sort_unstable_by_key(|entry| entry.key);
let greatest_key = recovery_entries[99].key;
if is_linear {
recovery_entries.sort_unstable_by_key(|entry| entry.key);
}
let greatest_key = recovery_entries
.iter()
.map(|entry| entry.key)
.max()
.unwrap();

let recovered_version = 123;
for chunk_size in [6, 10, 17, 42] {
let mut db = create_db();
let mut recovery = MerkleTreeRecovery::new(&mut db, recovered_version);
for (i, chunk) in recovery_entries.chunks(chunk_size).enumerate() {
recovery.extend(chunk.to_vec());
if is_linear {
recovery.extend_linear(chunk.to_vec());
} else {
recovery.extend_random(chunk.to_vec());
}
if i % 3 == 1 {
recovery = MerkleTreeRecovery::new(&mut db, recovered_version);
// ^ Simulate recovery interruption and restart
Expand Down Expand Up @@ -119,8 +129,13 @@ fn test_tree_after_recovery<DB: Database>(
}

#[test]
fn recovery_in_chunks() {
test_recovery_in_chunks(PatchSet::default);
fn linear_recovery_in_chunks() {
test_recovery_in_chunks(true, PatchSet::default);
}

#[test]
fn random_recovery_in_chunks() {
test_recovery_in_chunks(false, PatchSet::default);
}

mod rocksdb {
Expand All @@ -130,10 +145,20 @@ mod rocksdb {
use zksync_merkle_tree::RocksDBWrapper;

#[test]
fn recovery_in_chunks() {
fn linear_recovery_in_chunks() {
let temp_dir = TempDir::new().unwrap();
let mut counter = 0;
test_recovery_in_chunks(true, || {
counter += 1;
RocksDBWrapper::new(&temp_dir.path().join(counter.to_string()))
});
}

#[test]
fn random_recovery_in_chunks() {
let temp_dir = TempDir::new().unwrap();
let mut counter = 0;
test_recovery_in_chunks(|| {
test_recovery_in_chunks(false, || {
counter += 1;
RocksDBWrapper::new(&temp_dir.path().join(counter.to_string()))
});
Expand Down
2 changes: 0 additions & 2 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.