Skip to content

Commit

Permalink
refactor(object-store): Refactor object store to fit into node framew…
Browse files Browse the repository at this point in the history
…ork (#2138)

## What ❔

Refactors `ObjectStore` and `ObjectStoreFactory` to fit it into the node
framework:

- Consistently uses `Arc<dyn ObjectStore>` in DI
- Clearly documents `ObjectStoreFactory` as one possible `ObjectStore`
producer
- Expose GCS, file-based and mock object stores directly and remove
ability to create mock object stores from `ObjectStoreFactory`
- Refactors retries as `ObjectStore` "middleware"

## Why ❔

Currently, object store APIs don't fit into the node framework well,
leading to suboptimal DevEx.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 5, 2024
1 parent 351e13d commit 5c03964
Show file tree
Hide file tree
Showing 37 changed files with 512 additions and 452 deletions.
2 changes: 1 addition & 1 deletion core/bin/block_reverter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async fn main() -> anyhow::Result<()> {
block_reverter.enable_rolling_back_snapshot_objects(
ObjectStoreFactory::new(object_store_config.0)
.create_store()
.await,
.await?,
);
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/bin/external_node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,16 @@ pub(crate) async fn ensure_storage_initialized(

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
let object_store_config = snapshot_recovery_object_store_config()?;
let blob_store = ObjectStoreFactory::new(object_store_config)
let object_store = ObjectStoreFactory::new(object_store_config)
.create_store()
.await;
.await?;

let config = SnapshotsApplierConfig::default();
let mut snapshots_applier_task = SnapshotsApplierTask::new(
config,
pool,
Box::new(main_node_client.for_component("snapshot_recovery")),
blob_store,
object_store,
);
if let Some(snapshot_l1_batch) = recovery_config.snapshot_l1_batch_override {
tracing::info!(
Expand Down
2 changes: 1 addition & 1 deletion core/bin/snapshots_creator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> anyhow::Result<()> {
SnapshotsObjectStoreConfig::from_env().context("SnapshotsObjectStoreConfig::from_env()")?;
let blob_store = ObjectStoreFactory::new(object_store_config.0)
.create_store()
.await;
.await?;

let database_secrets = DatabaseSecrets::from_env().context("DatabaseSecrets")?;
let creator_config =
Expand Down
38 changes: 13 additions & 25 deletions core/bin/snapshots_creator/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::{

use rand::{thread_rng, Rng};
use zksync_dal::{Connection, CoreDal};
use zksync_object_store::ObjectStore;
use zksync_object_store::{MockObjectStore, ObjectStore};
use zksync_types::{
block::{L1BatchHeader, L1BatchTreeData, L2BlockHeader},
snapshots::{
Expand Down Expand Up @@ -257,8 +257,7 @@ async fn prepare_postgres(
async fn persisting_snapshot_metadata() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store_factory = ObjectStoreFactory::mock();
let object_store = object_store_factory.create_store().await;
let object_store = MockObjectStore::arc();

// Insert some data to Postgres.
let mut conn = pool.connection().await.unwrap();
Expand Down Expand Up @@ -306,18 +305,16 @@ async fn persisting_snapshot_metadata() {
async fn persisting_snapshot_factory_deps() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store_factory = ObjectStoreFactory::mock();
let object_store = object_store_factory.create_store().await;
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();
let snapshot_l1_batch_number = L1BatchNumber(8);

let object_store = object_store_factory.create_store().await;
let SnapshotFactoryDependencies { factory_deps } =
object_store.get(snapshot_l1_batch_number).await.unwrap();
let actual_deps: HashSet<_> = factory_deps.into_iter().collect();
Expand All @@ -328,18 +325,16 @@ async fn persisting_snapshot_factory_deps() {
async fn persisting_snapshot_logs() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store_factory = ObjectStoreFactory::mock();
let object_store = object_store_factory.create_store().await;
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();
let snapshot_l1_batch_number = L1BatchNumber(8);

let object_store = object_store_factory.create_store().await;
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;
}

Expand All @@ -364,12 +359,11 @@ async fn assert_storage_logs(
async fn recovery_workflow() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store_factory = ObjectStoreFactory::mock();
let object_store = object_store_factory.create_store().await;
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(0)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
Expand All @@ -387,14 +381,13 @@ async fn recovery_workflow() {
.iter()
.all(Option::is_none));

let object_store = object_store_factory.create_store().await;
let SnapshotFactoryDependencies { factory_deps } =
object_store.get(snapshot_l1_batch_number).await.unwrap();
let actual_deps: HashSet<_> = factory_deps.into_iter().collect();
assert_eq!(actual_deps, expected_outputs.deps);

// Process 2 storage log chunks, then stop.
SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(2)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
Expand All @@ -416,26 +409,23 @@ async fn recovery_workflow() {
);

// Process the remaining chunks.
let object_store = object_store_factory.create_store().await;
SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
.unwrap();

let object_store = object_store_factory.create_store().await;
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;
}

#[tokio::test]
async fn recovery_workflow_with_varying_chunk_size() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut rng = thread_rng();
let object_store_factory = ObjectStoreFactory::mock();
let object_store = object_store_factory.create_store().await;
let object_store = MockObjectStore::arc();
let mut conn = pool.connection().await.unwrap();
let expected_outputs = prepare_postgres(&mut rng, &mut conn, 10).await;

SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.stop_after_chunk_count(2)
.run(SEQUENTIAL_TEST_CONFIG, MIN_CHUNK_COUNT)
.await
Expand All @@ -461,12 +451,10 @@ async fn recovery_workflow_with_varying_chunk_size() {
storage_logs_chunk_size: 1, // << should be ignored
..SEQUENTIAL_TEST_CONFIG
};
let object_store = object_store_factory.create_store().await;
SnapshotCreator::for_tests(object_store, pool.clone())
SnapshotCreator::for_tests(object_store.clone(), pool.clone())
.run(config_with_other_size, MIN_CHUNK_COUNT)
.await
.unwrap();

let object_store = object_store_factory.create_store().await;
assert_storage_logs(&*object_store, snapshot_l1_batch_number, &expected_outputs).await;
}
10 changes: 6 additions & 4 deletions core/lib/object_store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
This crate provides the object storage abstraction that allows to get, put and remove binary blobs. The following
implementations are available:

- File-based storage saving blobs as separate files in the local filesystem
- GCS-based storage
- File-based store saving blobs as separate files in the local filesystem
- GCS-based store
- Mock in-memory store

These implementations are not exposed externally. Instead, a store trait object can be constructed based on the
[configuration], which can be provided explicitly or constructed from the environment.
Normally, these implementations are not used directly. Instead, a store trait object can be constructed based on the
[configuration], which can be provided explicitly or constructed from the environment. This trait object is what should
be used for dependency injection.

Besides the lower-level storage abstraction, the crate provides high-level typesafe methods to store (de)serializable
objects. Prefer using these methods whenever possible.
Expand Down
110 changes: 110 additions & 0 deletions core/lib/object_store/src/factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::sync::Arc;

use anyhow::Context as _;
use tokio::sync::OnceCell;
use zksync_config::configs::object_store::{ObjectStoreConfig, ObjectStoreMode};

use crate::{
file::FileBackedObjectStore,
gcs::{GoogleCloudStore, GoogleCloudStoreAuthMode},
raw::{ObjectStore, ObjectStoreError},
retries::StoreWithRetries,
};

/// Factory of [`ObjectStore`]s that caches the store instance once it's created. Used mainly for legacy reasons.
///
/// Please do not use this factory in dependency injection; rely on `Arc<dyn ObjectStore>` instead. This allows to
/// inject mock store implementations, decorate an object store with middleware etc.
#[derive(Debug)]
pub struct ObjectStoreFactory {
config: ObjectStoreConfig,
store: OnceCell<Arc<dyn ObjectStore>>,
}

impl ObjectStoreFactory {
/// Creates an object store factory based on the provided `config`.
pub fn new(config: ObjectStoreConfig) -> Self {
Self {
config,
store: OnceCell::new(),
}
}

/// Creates an [`ObjectStore`] or returns a cached store if one was created previously.
///
/// # Errors
///
/// Returns an error if store initialization fails (e.g., because of incorrect configuration).
pub async fn create_store(&self) -> anyhow::Result<Arc<dyn ObjectStore>> {
self.store
.get_or_try_init(|| async {
Self::create_from_config(&self.config)
.await
.with_context(|| {
format!(
"failed creating object store factory with configuration {:?}",
self.config
)
})
})
.await
.cloned()
}

async fn create_from_config(
config: &ObjectStoreConfig,
) -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
match &config.mode {
ObjectStoreMode::GCS { bucket_base_url } => {
tracing::trace!(
"Initialized GoogleCloudStorage Object store without credential file"
);
let store = StoreWithRetries::try_new(config.max_retries, || {
GoogleCloudStore::new(
GoogleCloudStoreAuthMode::Authenticated,
bucket_base_url.clone(),
)
})
.await?;
Ok(Arc::new(store))
}
ObjectStoreMode::GCSWithCredentialFile {
bucket_base_url,
gcs_credential_file_path,
} => {
tracing::trace!("Initialized GoogleCloudStorage Object store with credential file");
let store = StoreWithRetries::try_new(config.max_retries, || {
GoogleCloudStore::new(
GoogleCloudStoreAuthMode::AuthenticatedWithCredentialFile(
gcs_credential_file_path.clone(),
),
bucket_base_url.clone(),
)
})
.await?;
Ok(Arc::new(store))
}
ObjectStoreMode::FileBacked {
file_backed_base_path,
} => {
tracing::trace!("Initialized FileBacked Object store");
let store = StoreWithRetries::try_new(config.max_retries, || {
FileBackedObjectStore::new(file_backed_base_path.clone())
})
.await?;
Ok(Arc::new(store))
}
ObjectStoreMode::GCSAnonymousReadOnly { bucket_base_url } => {
tracing::trace!("Initialized GoogleCloudStoragePublicReadOnly store");
let store = StoreWithRetries::try_new(config.max_retries, || {
GoogleCloudStore::new(
GoogleCloudStoreAuthMode::Anonymous,
bucket_base_url.clone(),
)
})
.await?;
Ok(Arc::new(store))
}
}
}
}
8 changes: 7 additions & 1 deletion core/lib/object_store/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ impl From<io::Error> for ObjectStoreError {
}
}

/// [`ObjectStore`] implementation storing objects as files in a local filesystem. Mostly useful for local testing.
#[derive(Debug)]
pub(crate) struct FileBackedObjectStore {
pub struct FileBackedObjectStore {
base_dir: String,
}

impl FileBackedObjectStore {
/// Creates a new file-backed store with its root at the specified path.
///
/// # Errors
///
/// Propagates I/O errors.
pub async fn new(base_dir: String) -> Result<Self, ObjectStoreError> {
for bucket in &[
Bucket::ProverJobs,
Expand Down
Loading

0 comments on commit 5c03964

Please sign in to comment.