Skip to content

Commit

Permalink
indexer formal restore 1/N: restore packages and move objects
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Aug 1, 2024
1 parent 8599d66 commit e6e0d8a
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 20 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ axum.workspace = true
backoff.workspace = true
bcs.workspace = true
chrono.workspace = true
serde_with.workspace = true
bytes.workspace = true
clap.workspace = true
tap.workspace = true
diesel = { workspace = true, optional = true }
diesel-derive-enum = { workspace = true, optional = true }
futures.workspace = true
indicatif.workspace = true
itertools.workspace = true
jsonrpsee.workspace = true
object_store.workspace = true
prometheus.workspace = true
serde_with.workspace = true
serde.workspace = true
serde_json.workspace = true
rayon.workspace = true
Expand All @@ -35,13 +38,18 @@ url.workspace = true

fastcrypto = { workspace = true, features = ["copy_key"] }
mysten-metrics.workspace = true
sui-archival.workspace = true
sui-config.workspace = true
sui-core.workspace = true
sui-data-ingestion-core.workspace = true
sui-json.workspace = true
sui-json-rpc.workspace = true
sui-json-rpc-api.workspace = true
sui-json-rpc-types.workspace = true
sui-open-rpc.workspace = true
sui-sdk.workspace = true
sui-snapshot.workspace = true
sui-storage.workspace = true
sui-types.workspace = true
sui-package-resolver.workspace = true
sui-protocol-config.workspace = true
Expand Down
7 changes: 3 additions & 4 deletions crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ pub mod setup_postgres {
pub async fn setup(
indexer_config: IndexerConfig,
registry: Registry,
indexer_metrics: IndexerMetrics,
) -> Result<(), IndexerError> {
info!("Setting up postgres database connection pool");
let db_url_secret = indexer_config.get_db_url().map_err(|e| {
IndexerError::PgPoolConnectionError(format!(
"Failed parsing database url with error {:?}",
Expand Down Expand Up @@ -291,8 +293,6 @@ pub mod setup_postgres {
})?;
info!("Reset Postgres database complete.");
}
let indexer_metrics = IndexerMetrics::new(&registry);
mysten_metrics::init_metrics(&registry);

let report_cp = blocking_cp.clone();
let report_metrics = indexer_metrics.clone();
Expand Down Expand Up @@ -400,6 +400,7 @@ pub mod setup_mysql {
pub async fn setup(
indexer_config: IndexerConfig,
registry: Registry,
indexer_metrics: IndexerMetrics,
) -> Result<(), IndexerError> {
let db_url_secret = indexer_config.get_db_url().map_err(|e| {
IndexerError::PgPoolConnectionError(format!(
Expand Down Expand Up @@ -433,8 +434,6 @@ pub mod setup_mysql {
)?;
info!("Reset MySQL database complete.");
}
let indexer_metrics = IndexerMetrics::new(&registry);
mysten_metrics::init_metrics(&registry);

let report_cp = blocking_cp.clone();
let report_metrics = indexer_metrics.clone();
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod indexer;
pub mod indexer_reader;
pub mod metrics;
pub mod models;
pub mod restorer;
pub mod schema;
pub mod store;
pub mod system_package_task;
Expand Down Expand Up @@ -91,6 +92,10 @@ pub struct IndexerConfig {
pub name_service_registry_id: Option<ObjectID>,
#[clap(long)]
pub name_service_reverse_registry_id: Option<ObjectID>,
#[clap(long, global = true)]
pub archive_bucket: String,
#[clap(long, global = true)]
pub formal_snapshot_bucket: String,
}

impl IndexerConfig {
Expand Down Expand Up @@ -147,6 +152,8 @@ impl Default for IndexerConfig {
name_service_package_address: None,
name_service_registry_id: None,
name_service_reverse_registry_id: None,
archive_bucket: "mysten-mainnet-archives".to_string(),
formal_snapshot_bucket: "mysten-mainnet-formal".to_string(),
}
}
}
Expand Down
49 changes: 45 additions & 4 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::env;

use clap::Parser;
use tracing::info;

use sui_indexer::errors::IndexerError;
use sui_indexer::metrics::start_prometheus_server;
use sui_indexer::metrics::IndexerMetrics;
use sui_indexer::restorer::archives::read_next_checkpoint_after_epoch;
use sui_indexer::restorer::formal_snapshot::IndexerFormalSnapshotRestorer;
use sui_indexer::IndexerConfig;

#[tokio::main]
Expand All @@ -16,13 +21,13 @@ async fn main() -> Result<(), IndexerError> {
.init();

let mut indexer_config = IndexerConfig::parse();
info!("Parsed indexer config: {:#?}", indexer_config);
// TODO: remove. Temporary safeguard to migrate to `rpc_client_url` usage
if indexer_config.rpc_client_url.contains("testnet") {
indexer_config.remote_store_url = Some("https://checkpoints.testnet.sui.io".to_string());
} else if indexer_config.rpc_client_url.contains("mainnet") {
indexer_config.remote_store_url = Some("https://checkpoints.mainnet.sui.io".to_string());
}
info!("Parsed indexer config: {:#?}", indexer_config);
let (_registry_service, registry) = start_prometheus_server(
// NOTE: this parses the input host addr and port number for socket addr,
// so unwrap() is safe here.
Expand All @@ -34,11 +39,47 @@ async fn main() -> Result<(), IndexerError> {
.unwrap(),
indexer_config.rpc_client_url.as_str(),
)?;
#[cfg(feature = "postgres-feature")]
sui_indexer::db::setup_postgres::setup(indexer_config.clone(), registry.clone()).await?;
let indexer_metrics = IndexerMetrics::new(&registry);
mysten_metrics::init_metrics(&registry);

// TODO: condition to trigger formal snapshot restore
let start_epoch = env::var("START_EPOCH")
.expect("START_EPOCH not set")
.parse::<u64>()
.ok();
if let Some(start_epoch) = start_epoch {
let cred_path = env::var("GOOGLE_APPLICATION_CREDENTIALS_PATH")
.expect("GOOGLE_APPLICATION_CREDENTIALS_PATH not set");

let next_checkpoint_after_epoch = read_next_checkpoint_after_epoch(
cred_path.clone(),
Some(indexer_config.archive_bucket.clone()),
start_epoch,
)
.await?;
let base_path_string = env::var("SNAPSHOT_DIR").expect("SNAPSHOT_DIR not set");
let mut formal_restorer = IndexerFormalSnapshotRestorer::new(
cred_path.clone(),
indexer_config.formal_snapshot_bucket.clone(),
base_path_string.clone(),
start_epoch,
next_checkpoint_after_epoch,
indexer_config.clone(),
indexer_metrics.clone(),
)
.await?;
formal_restorer.restore().await?;
}

#[cfg(feature = "postgres-feature")]
sui_indexer::db::setup_postgres::setup(
indexer_config.clone(),
registry.clone(),
indexer_metrics.clone(),
)
.await?;
#[cfg(feature = "mysql-feature")]
#[cfg(not(feature = "postgres-feature"))]
sui_indexer::db::setup_mysql::setup(indexer_config, registry).await?;
sui_indexer::db::setup_mysql::setup(indexer_config, registry, indexer_metrics).await?;
Ok(())
}
43 changes: 43 additions & 0 deletions crates/sui-indexer/src/restorer/archives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::num::NonZeroUsize;

use prometheus::Registry;
use tracing::info;

use sui_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
use sui_config::node::ArchiveReaderConfig;
use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};

use crate::types::IndexerResult;

pub async fn read_next_checkpoint_after_epoch(
cred_path: String,
archive_bucket: Option<String>,
epoch: u64,
) -> IndexerResult<u64> {
let archive_store_config = ObjectStoreConfig {
object_store: Some(ObjectStoreType::GCS),
bucket: archive_bucket,
google_service_account: Some(cred_path.clone()),
object_store_connection_limit: 50,
no_sign_request: false,
..Default::default()
};
let archive_reader_config = ArchiveReaderConfig {
remote_store_config: archive_store_config,
download_concurrency: NonZeroUsize::new(50).unwrap(),
use_for_pruning_watermark: false,
};
let metrics = ArchiveReaderMetrics::new(&Registry::default());
let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
archive_reader.sync_manifest_once().await?;
let manifest = archive_reader.get_manifest().await?;
let next_checkpoint_after_epoch = manifest.next_checkpoint_after_epoch(epoch);
info!(
"Read from archives: next checkpoint sequence after epoch {} is: {}",
epoch, next_checkpoint_after_epoch
);
Ok(next_checkpoint_after_epoch)
}
Loading

0 comments on commit e6e0d8a

Please sign in to comment.