Skip to content

Commit

Permalink
feat(en): Resume incomplete snapshot in snapshot creator in more cases (
Browse files Browse the repository at this point in the history
#2886)

## What ❔

Resumes an incomplete snapshot in the snapshot creator if the creator
config doesn't specify an L1 batch.

## Why ❔

This effectively reverts the relevant changes from
#2256. It makes the
snapshot creator resilient by default without additional setup, at the
cost of parallel creator jobs working incorrectly (unless they all
specify L1 batches).

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Sep 16, 2024
1 parent 438c820 commit f095b4a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/bin/snapshots_creator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ futures.workspace = true

[dev-dependencies]
rand.workspace = true
test-casing.workspace = true
42 changes: 22 additions & 20 deletions core/bin/snapshots_creator/src/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,44 +291,46 @@ impl SnapshotCreator {
.get_sealed_l1_batch_number()
.await?;
let sealed_l1_batch_number = sealed_l1_batch_number.context("No L1 batches in Postgres")?;
let requested_l1_batch_number = if let Some(l1_batch_number) = config.l1_batch_number {
let (requested_l1_batch_number, existing_snapshot) = if let Some(l1_batch_number) =
config.l1_batch_number
{
anyhow::ensure!(
l1_batch_number <= sealed_l1_batch_number,
"Requested a snapshot for L1 batch #{l1_batch_number} that doesn't exist in Postgres (latest L1 batch: {sealed_l1_batch_number})"
);
l1_batch_number

let existing_snapshot = master_conn
.snapshots_dal()
.get_snapshot_metadata(l1_batch_number)
.await?;
(l1_batch_number, existing_snapshot)
} else {
// We subtract 1 so that after restore, EN node has at least one L1 batch to fetch.
anyhow::ensure!(
sealed_l1_batch_number != L1BatchNumber(0),
"Cannot create snapshot when only the genesis L1 batch is present in Postgres"
);
sealed_l1_batch_number - 1
};
let requested_l1_batch_number = sealed_l1_batch_number - 1;

let existing_snapshot = master_conn
.snapshots_dal()
.get_snapshot_metadata(requested_l1_batch_number)
.await?;
// Continue creating a pending snapshot if it exists, even if it doesn't correspond to the latest L1 batch.
// OTOH, a completed snapshot does not matter, unless it corresponds to `requested_l1_batch_number` (in which case it doesn't need to be created again).
let existing_snapshot = master_conn
.snapshots_dal()
.get_newest_snapshot_metadata()
.await?
.filter(|snapshot| {
!snapshot.is_complete() || snapshot.l1_batch_number == requested_l1_batch_number
});
(requested_l1_batch_number, existing_snapshot)
};
drop(master_conn);

match existing_snapshot {
Some(snapshot) if snapshot.is_complete() => {
tracing::info!("Snapshot for the requested L1 batch is complete: {snapshot:?}");
Ok(None)
}
Some(snapshot) if config.l1_batch_number.is_some() => {
Ok(Some(SnapshotProgress::from_existing_snapshot(&snapshot)))
}
Some(snapshot) => {
// Unless creating a snapshot for a specific L1 batch is requested, we never continue an existing snapshot, even if it's incomplete.
// This it to make running multiple snapshot creator instances in parallel easier to reason about.
tracing::warn!(
"Snapshot at expected L1 batch #{requested_l1_batch_number} exists, but is incomplete: {snapshot:?}. If you need to resume creating it, \
specify the L1 batch number in the snapshot creator config"
);
Ok(None)
}
Some(snapshot) => Ok(Some(SnapshotProgress::from_existing_snapshot(&snapshot))),
None => {
Self::initialize_snapshot_progress(
config,
Expand Down
99 changes: 77 additions & 22 deletions core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
};

use rand::{thread_rng, Rng};
use test_casing::test_casing;
use zksync_config::SnapshotsCreatorConfig;
use zksync_dal::{Connection, CoreDal};
use zksync_object_store::{MockObjectStore, ObjectStore};
Expand Down Expand Up @@ -64,6 +65,15 @@ impl HandleEvent for TestEventListener {
}
}

#[derive(Debug)]
struct UnreachableEventListener;

impl HandleEvent for UnreachableEventListener {
fn on_chunk_started(&self) -> TestBehavior {
unreachable!("should not be reached");
}
}

impl SnapshotCreator {
fn for_tests(blob_store: Arc<dyn ObjectStore>, pool: ConnectionPool<Core>) -> Self {
Self {
Expand All @@ -80,6 +90,13 @@ impl SnapshotCreator {
..self
}
}

fn panic_on_chunk_start(self) -> Self {
Self {
event_listener: Box::new(UnreachableEventListener),
..self
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -431,8 +448,9 @@ async fn persisting_snapshot_logs_for_v0_snapshot() {
assert_eq!(actual_logs, expected_outputs.storage_logs);
}

#[test_casing(2, [false, true])]
#[tokio::test]
async fn recovery_workflow() {
async fn recovery_workflow(specify_batch_after_recovery: bool) {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store = MockObjectStore::arc();
Expand Down Expand Up @@ -462,29 +480,9 @@ async fn recovery_workflow() {
let actual_deps: HashSet<_> = factory_deps.into_iter().collect();
assert_eq!(actual_deps, expected_outputs.deps);

// Check that the creator does nothing unless it's requested to create a new snapshot.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(2)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();
let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(
snapshot_metadata
.storage_logs_filepaths
.iter()
.all(Option::is_none),
"{snapshot_metadata:?}"
);

// Process 2 storage log chunks, then stop.
let recovery_config = SnapshotsCreatorConfig {
l1_batch_number: Some(snapshot_l1_batch_number),
l1_batch_number: specify_batch_after_recovery.then_some(snapshot_l1_batch_number),
..SEQUENTIAL_TEST_CONFIG
};
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
Expand All @@ -510,11 +508,68 @@ async fn recovery_workflow() {

// Process the remaining chunks.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(recovery_config.clone(), MIN_CHUNK_COUNT)
.await
.unwrap();

assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;

// Check that the snapshot is not created anew after it is completed.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.panic_on_chunk_start()
.run(recovery_config, MIN_CHUNK_COUNT)
.await
.unwrap();

let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
}

#[tokio::test]
async fn recovery_workflow_with_new_l1_batch() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(2)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();

let snapshot_l1_batch_number = L1BatchNumber(8);
let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(!snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");

let new_logs = gen_storage_logs(&mut thread_rng(), 50);
create_l1_batch(&mut conn, snapshot_l1_batch_number + 2, &new_logs).await;

// The old snapshot should be completed.
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;

let snapshot_metadata = conn
.snapshots_dal()
.get_snapshot_metadata(snapshot_l1_batch_number)
.await
.unwrap()
.expect("No snapshot metadata");
assert!(snapshot_metadata.is_complete(), "{snapshot_metadata:#?}");
}

#[tokio::test]
Expand Down

0 comments on commit f095b4a

Please sign in to comment.