Skip to content

Commit

Permalink
indexer: restore objects from formal snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Sep 9, 2024
1 parent 072fcfb commit 809a50d
Show file tree
Hide file tree
Showing 11 changed files with 550 additions and 75 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ anyhow.workspace = true
async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
bb8 = "0.8.5"
bcs.workspace = true
bytes.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["env"] }
csv.workspace = true
diesel = { workspace = true, features = ["postgres", "chrono", "r2d2", "serde_json"] }
diesel-async = { workspace = true, features = ["bb8", "postgres", "async-connection-wrapper"] }
bb8 = "0.8.5"
diesel = { workspace = true, features = ["postgres", "chrono", "r2d2", "serde_json"] }
futures.workspace = true
hex.workspace = true
indicatif.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
object_store.workspace = true
Expand All @@ -41,13 +42,16 @@ url.workspace = true
fastcrypto = { workspace = true, features = ["copy_key"] }
mysten-metrics.workspace = true
sui-config.workspace = true
sui-archival.workspace = true
sui-core.workspace = true
sui-data-ingestion-core.workspace = true
sui-json.workspace = true
sui-json-rpc.workspace = true
sui-json-rpc-api.workspace = true
sui-json-rpc-types.workspace = true
sui-open-rpc.workspace = true
sui-sdk.workspace = true
sui-snapshot.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-package-resolver.workspace = true
Expand Down
30 changes: 27 additions & 3 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub enum Command {
#[command(flatten)]
pruning_options: PruningOptions,
#[command(flatten)]
restore_config: RestoreConfig,
upload_options: UploadOptions,
},
JsonRpcService(JsonRpcConfig),
ResetDatabase {
Expand All @@ -162,6 +162,8 @@ pub enum Command {
},
/// Run through the migration scripts.
RunMigrations,
/// Restore the database from formal snaphots.
Restore(RestoreConfig),
}

#[derive(Args, Default, Debug, Clone)]
Expand Down Expand Up @@ -201,11 +203,33 @@ impl Default for SnapshotLagConfig {
}

#[derive(Args, Debug, Clone, Default)]
pub struct RestoreConfig {
pub struct UploadOptions {
#[arg(long, env = "GCS_DISPLAY_BUCKET")]
pub gcs_display_bucket: Option<String>,
#[arg(long, env = "GCS_CRED_PATH")]
pub gcs_cred_path: Option<String>,
}

#[derive(Args, Debug, Clone)]
pub struct RestoreConfig {
#[arg(long, env = "GCS_ARCHIVE_BUCKET")]
pub gcs_archive_bucket: String,
#[arg(long, env = "GCS_CRED_PATH")]
pub gcs_cred_path: String,
#[arg(long, env = "GCS_DISPLAY_BUCKET")]
pub gcs_display_bucket: Option<String>,
pub gcs_display_bucket: String,
#[arg(long, env = "GCS_SNAPSHOT_DIR")]
pub gcs_snapshot_dir: String,
#[arg(long, env = "GCS_SNAPSHOT_BUCKET")]
pub gcs_snapshot_bucket: String,
#[arg(long, env = "START_EPOCH")]
pub start_epoch: u64,
#[arg(long, env = "S3_ENDPOINT")]
pub s3_endpoint: String,
#[arg(env = "OBJECT_STORE_CONCURRENT_LIMIT")]
pub object_store_concurrent_limit: usize,
#[arg(env = "OBJECT_STORE_MAX_TIMEOUT_SECS")]
pub object_store_max_timeout_secs: u64,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod indexer;
pub mod indexer_reader;
pub mod metrics;
pub mod models;
pub mod restorer;
pub mod schema;
pub mod store;
pub mod system_package_task;
Expand Down
23 changes: 19 additions & 4 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use clap::Parser;
use sui_indexer::config::Command;
use sui_indexer::config::{Command, UploadOptions};
use sui_indexer::database::ConnectionPool;
use sui_indexer::db::{
check_db_migration_consistency, new_connection_pool, reset_database, run_migrations,
Expand All @@ -15,6 +14,7 @@ use tracing::warn;
use sui_indexer::metrics::{
spawn_connection_pool_metric_collector, start_prometheus_server, IndexerMetrics,
};
use sui_indexer::restorer::formal_snapshot::IndexerFormalSnapshotRestorer;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -44,15 +44,15 @@ async fn main() -> anyhow::Result<()> {
ingestion_config,
snapshot_config,
pruning_options,
restore_config,
upload_options,
} => {
// Make sure to run all migrations on startup, and also serve as a compatibility check.
run_migrations(pool.dedicated_connection().await?).await?;

let store = PgIndexerStore::new(
connection_pool,
pool,
restore_config,
upload_options,
indexer_metrics.clone(),
);

Expand Down Expand Up @@ -83,6 +83,21 @@ async fn main() -> anyhow::Result<()> {
Command::RunMigrations => {
run_migrations(pool.dedicated_connection().await?).await?;
}
Command::Restore(restore_config) => {
let upload_options = UploadOptions {
gcs_display_bucket: Some(restore_config.gcs_display_bucket.clone()),
gcs_cred_path: Some(restore_config.gcs_snapshot_bucket.clone()),
};
let store = PgIndexerStore::new(
connection_pool,
pool,
upload_options,
indexer_metrics.clone(),
);
let mut formal_restorer =
IndexerFormalSnapshotRestorer::new(store, restore_config).await?;
formal_restorer.restore().await?;
}
}

Ok(())
Expand Down
43 changes: 43 additions & 0 deletions crates/sui-indexer/src/restorer/archives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::num::NonZeroUsize;

use prometheus::Registry;
use tracing::info;

use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};

use crate::types::IndexerResult;

pub async fn read_next_checkpoint_after_epoch(
cred_path: String,
archive_bucket: Option<String>,
epoch: u64,
) -> IndexerResult<u64> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: archive_bucket,
google_service_account: Some(cred_path.clone()),
object_store_connection_limit: 50,
no_sign_request: false,
..Default::default()
};
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: archive_store_config,
download_concurrency: NonZeroUsize::new(50).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
archive_reader.sync_manifest_once().await?;
let manifest = archive_reader.get_manifest().await?;
let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(epoch);
info!(
"Read from archives: next checkpoint sequence after epoch {} is: {}",
epoch, next_checkpoint_after_epoch
);
Ok(next_checkpoint_after_epoch)
}
Loading

0 comments on commit 809a50d

Please sign in to comment.