From 5065a0a49485b70b925b46a29ca4a93d19d8a626 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Wed, 2 Oct 2024 13:24:32 -0700 Subject: [PATCH] cleanup instantiation --- .../sui-graphql-rpc/src/test_infra/cluster.rs | 3 +- crates/sui-indexer/src/config.rs | 176 ++++++++++-------- crates/sui-indexer/src/db.rs | 19 +- crates/sui-indexer/src/handlers/pruner.rs | 34 +--- crates/sui-indexer/src/indexer.rs | 7 +- crates/sui-indexer/src/main.rs | 17 +- crates/sui-indexer/src/test_utils.rs | 9 +- .../sui-transactional-test-runner/src/args.rs | 28 +-- .../src/test_adapter.rs | 25 +-- 9 files changed, 131 insertions(+), 187 deletions(-) diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 0891c0ce42a52..84d182b4be58b 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -14,7 +14,6 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use sui_graphql_rpc_client::simple_client::SimpleClient; -use sui_indexer::config::PruningOptions; pub use sui_indexer::config::RetentionPolicies; pub use sui_indexer::config::SnapshotLagConfig; use sui_indexer::errors::IndexerError; @@ -184,7 +183,7 @@ pub async fn serve_executor( let (pg_store, pg_handle, _) = start_indexer_writer_for_testing( db_url, Some(snapshot_config.clone()), - Some(PruningOptions { epochs_to_keep }), + retention_policies, Some(data_ingestion_path), Some(cancellation_token.clone()), ) diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index 2390788ea3779..2691d9cf25a63 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -5,7 +5,6 @@ use crate::db::ConnectionPoolConfig; use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable}; use clap::{Args, Parser, Subcommand}; use serde::{Deserialize, Serialize}; -use std::str::FromStr; use std::{collections::HashMap, net::SocketAddr, path::PathBuf}; use strum::IntoEnumIterator; use sui_json_rpc::name_service::NameServiceConfig; @@ -15,7 +14,6 @@ use url::Url; /// The primary purpose of objects_history is to serve consistency query. /// A short retention is sufficient. const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2; -const DEFAULT_EPOCHS_TO_KEEP: u64 = 30; #[derive(Parser, Clone, Debug)] #[clap( @@ -221,66 +219,87 @@ pub struct PruningOptions { pub pruning_config_path: Option, } -/// Default retention policies and overrides for the pruner. Instantiated only if `PruningOptions` -/// is provided on indexer start. Any tables not named in the file will inherit the default -/// retention policy `epochs_to_keep`. +/// Represents the default retention policy and overrides for prunable tables. When `finalize` is +/// called, the `policies` field is updated with the default retention policy for all tables that do +/// not have an override specified. Instantiated only if `PruningOptions` is provided on indexer +/// start. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RetentionPolicies { /// Default retention policy for all tables. pub epochs_to_keep: u64, - /// Mapping of retention policies for specific tables which will override the default policy - /// specified by `epochs_to_keep`. - #[serde(default)] - pub overrides: HashMap, + /// In Rust, a mapping of all `PrunableTable` variants to their respective retention policies. + /// From a TOML file, a user only needs to explicitly list tables that should override the + /// default retention. + #[serde(default, rename = "overrides")] + pub policies: HashMap, } -impl RetentionPolicies { - pub fn new(epochs_to_keep: u64, overrides: HashMap) -> Self { - let overrides = overrides - .into_iter() - .filter_map(|(table_name, retention)| { - PrunableTable::from_str(&table_name) - .ok() - .map(|table| (table, retention)) - }) - .collect(); +impl PruningOptions { + /// Load default retention policy and overrides from file. + pub fn load_from_file(&self) -> Option { + let Some(config_path) = self.pruning_config_path.as_ref() else { + return None; + }; + + let contents = std::fs::read_to_string(&config_path) + .expect("Failed to read default retention policy and overrides from file"); + let retention_with_overrides = toml::de::from_str::(&contents) + .expect("Failed to parse into RetentionPolicies struct"); + + let default_retention = retention_with_overrides.epochs_to_keep; + + assert!( + default_retention > 0, + "Default retention must be greater than 0" + ); + assert!( + retention_with_overrides + .policies + .values() + .all(|&policy| policy > 0), + "All retention overrides must be greater than 0" + ); + + Some(retention_with_overrides) + } +} +impl RetentionPolicies { + /// Create a new `RetentionPolicies` with the specified default retention and overrides. Call + /// `finalize()` on the instance to update the `policies` field with the default retention + /// policy for all tables that do not have an override specified. + pub fn new(epochs_to_keep: u64, overrides: HashMap) -> Self { Self { epochs_to_keep, - overrides, + policies: overrides, } } /// Create a new `RetentionPolicies` with only the default retention specified and the default - /// override for `objects_history`. - pub fn new_with_default_retention_only(epochs_to_keep: u64) -> Self { + /// override for `objects_history`. Call `finalize()` on the instance to update the `policies` + /// field with the default retention policy for all tables that do not have an override + /// specified. + pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self { let mut overrides = HashMap::new(); overrides.insert( PrunableTable::ObjectsHistory, OBJECTS_HISTORY_EPOCHS_TO_KEEP, ); - Self { - epochs_to_keep, - overrides, - } + Self::new(epochs_to_keep, overrides) } - /// Create a new `RetentionPolicies` with only the overrides specified and a default retention - /// of 30 days. - pub fn new_with_overrides_only(overrides: HashMap) -> Self { - let overrides = overrides - .into_iter() - .filter_map(|(table_name, retention)| { - PrunableTable::from_str(&table_name) - .ok() - .map(|table| (table, retention)) - }) - .collect(); - Self { - overrides, - epochs_to_keep: DEFAULT_EPOCHS_TO_KEEP, + /// Updates the `policies` field with the default retention policy for all tables that do not + /// have an override specified. + pub fn finalize(mut self) -> Self { + for table in PrunableTable::iter() { + self.policies.entry(table).or_insert(self.epochs_to_keep); } + self + } + + pub fn get(&self, table: &PrunableTable) -> Option { + self.policies.get(table).copied() } } @@ -359,42 +378,12 @@ impl Default for RestoreConfig { } } -/// Load default retention policy and overrides from file, and then compile into a map that sets a -/// retention policy for each pruanble table. -pub fn finalize_retention_policies(config_path: PathBuf) -> HashMap { - let contents = std::fs::read_to_string(&config_path) - .expect("Failed to read default retention policy and overrides from file"); - let policies = toml::de::from_str::(&contents) - .expect("Failed to parse into RetentionPolicies struct"); - - assert!( - policies.epochs_to_keep > 0, - "Default retention must be greater than 0" - ); - assert!( - policies.overrides.values().all(|&retention| retention > 0), - "All retention overrides must be greater than 0" - ); - - let mut merged_policies = HashMap::new(); - - for table in PrunableTable::iter() { - let retention = policies - .overrides - .get(&table) - .copied() - .unwrap_or(policies.epochs_to_keep); - - merged_policies.insert(table, retention); - } - - merged_policies -} - #[cfg(test)] mod test { use super::*; + use std::io::Write; use tap::Pipe; + use tempfile::NamedTempFile; fn parse_args<'a, T>(args: impl IntoIterator) -> Result where @@ -471,12 +460,47 @@ mod test { let opts = toml::de::from_str::(toml_str).unwrap(); assert_eq!(opts.epochs_to_keep, 5); + assert_eq!(opts.policies.get(&PrunableTable::ObjectsHistory), Some(&10)); + assert_eq!(opts.policies.get(&PrunableTable::Transactions), Some(&20)); + assert_eq!(opts.policies.len(), 2); + } + + #[test] + fn test_pruning_options_from_file() { + let mut temp_file = NamedTempFile::new().unwrap(); + let toml_content = r#" + epochs_to_keep = 5 + + [overrides] + objects_history = 10 + transactions = 20 + "#; + temp_file.write_all(toml_content.as_bytes()).unwrap(); + let temp_path: PathBuf = temp_file.path().to_path_buf(); + let pruning_options = PruningOptions { + pruning_config_path: Some(temp_path.clone()), + }; + let mut retention_policies = pruning_options.load_from_file().unwrap(); + + // Assert the parsed values + assert_eq!(retention_policies.epochs_to_keep, 5); + assert_eq!( + retention_policies.get(&PrunableTable::ObjectsHistory), + Some(10) + ); assert_eq!( - opts.overrides.get(&PrunableTable::ObjectsHistory), - Some(&10) + retention_policies.get(&PrunableTable::Transactions), + Some(20) + ); + assert_eq!(retention_policies.policies.len(), 2); + + retention_policies = retention_policies.finalize(); + + assert!( + retention_policies.policies.len() > 2, + "Expected more than 2 policies, but got {}", + retention_policies.policies.len() ); - assert_eq!(opts.overrides.get(&PrunableTable::Transactions), Some(&20)); - assert_eq!(opts.overrides.len(), 2); } #[test] diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index 191c09d5e0f0d..9937b61ce2655 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -3,6 +3,7 @@ use crate::database::Connection; use crate::errors::IndexerError; +use crate::handlers::pruner::PrunableTable; use clap::Args; use diesel::migration::{Migration, MigrationSource, MigrationVersion}; use diesel::pg::Pg; @@ -10,8 +11,9 @@ use diesel::prelude::QueryableByName; use diesel::table; use diesel::QueryDsl; use diesel_migrations::{embed_migrations, EmbeddedMigrations}; -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeSet, HashSet}; use std::time::Duration; +use strum::IntoEnumIterator; use tracing::info; table! { @@ -135,11 +137,8 @@ async fn check_db_migration_consistency_impl( ))) } -/// Checks that the tables named in `RetentionPolicies.overrides`, if any, actually exist in the db. -pub async fn check_retention_policy_overrides_valid( - conn: &mut Connection<'_>, - overrides: &HashMap, -) -> Result<(), IndexerError> { +/// Check that prunable tables exist in the database. +pub async fn check_prunable_tables_valid(conn: &mut Connection<'_>) -> Result<(), IndexerError> { info!("Starting compatibility check"); use diesel_async::RunQueryDsl; @@ -172,12 +171,10 @@ pub async fn check_retention_policy_overrides_valid( .await .map_err(|e| IndexerError::DbMigrationError(format!("Failed to fetch tables: {e}")))?; - // Check that each key in overrides exists in result - let map_keys: HashSet<&String> = overrides.keys().collect(); - let existing_tables: HashSet<_> = result.into_iter().map(|t| t.table_name).collect(); + let parent_tables_from_db: HashSet<_> = result.into_iter().map(|t| t.table_name).collect(); - for key in map_keys { - if !existing_tables.contains(key) { + for key in PrunableTable::iter() { + if !parent_tables_from_db.contains(key.as_ref()) { return Err(IndexerError::GenericError(format!( "Invalid retention policy override provided for table {}: does not exist in the database", key diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index 6ad4664f664cd..9f196a6d4f028 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -9,7 +9,6 @@ use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::time::Duration; -use strum::IntoEnumIterator; use strum_macros; use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -17,12 +16,7 @@ use tracing::{error, info}; pub struct Pruner { pub store: PgIndexerStore, pub partition_manager: PgPartitionManager, - // TODO: (wlmyng) we can remove this once pruner consults `retention_policies` for all tables to - // prune. - pub default_retention: u64, - /// Retention policies for each prunable table. If a table is not listed here, it will not be - /// pruned. - pub retention_policies: HashMap, + pub retention_policies: RetentionPolicies, pub metrics: IndexerMetrics, } @@ -73,32 +67,19 @@ pub enum PrunableTable { } impl Pruner { - /// Instantiates a pruner with default retention and overrides. Pruner will merge these into a - /// list of policies for each prunable table. + /// Instantiates a pruner with default retention and overrides. Pruner will finalize the + /// retention policies so there is a value for every prunable table. pub fn new( store: PgIndexerStore, retention_policies: RetentionPolicies, metrics: IndexerMetrics, ) -> Result { let partition_manager = PgPartitionManager::new(store.pool())?; - let mut merged_policies = HashMap::new(); - - // Iterate through the prunable table variants - for table in PrunableTable::iter() { - let retention = retention_policies - .overrides - .get(&table) - .copied() - .unwrap_or(retention_policies.epochs_to_keep); - - merged_policies.insert(table, retention); - } Ok(Self { store, partition_manager, - default_retention: retention_policies.epochs_to_keep, - retention_policies: merged_policies, + retention_policies: retention_policies.finalize(), metrics, }) } @@ -107,7 +88,7 @@ impl Pruner { /// table is not prunable. fn epochs_to_keep(&self, table_name: &str) -> Option { if let Ok(variant) = table_name.parse::() { - self.retention_policies.get(&variant).copied() + self.retention_policies.get(&variant) } else { None } @@ -168,8 +149,9 @@ impl Pruner { // TODO: (wlmyng) Once we have the watermarks table, we can iterate through each row // returned from `watermarks`, look it up against `retention_policies`, and process them // independently. This also means that pruning overrides will only apply for - // epoch-partitioned tables. - let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.default_retention - 1); + // epoch-partitioned tables right now. + let prune_to_epoch = + last_seen_max_epoch.saturating_sub(self.retention_policies.epochs_to_keep - 1); let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch); for epoch in prune_start_epoch..prune_to_epoch { if cancel.is_cancelled() { diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index f55398cefa526..f9f1d357364c9 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -89,12 +89,7 @@ impl Indexer { .await?; if let Some(retention_policies) = retention_policies { - info!( - "Starting indexer pruner with epochs to keep: {}", - retention_policies.epochs_to_keep - ); - assert!(epochs_to_keep > 0, "Epochs to keep must be positive"); - let pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone())?; + let pruner = Pruner::new(store.clone(), retention_policies, metrics.clone())?; let cancel_clone = cancel.clone(); spawn_monitored_task!(pruner.start(cancel_clone)); } diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index eafcee0490128..e5364bf00a136 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -6,8 +6,7 @@ use sui_indexer::backfill::backfill_runner::BackfillRunner; use sui_indexer::config::{Command, UploadOptions}; use sui_indexer::database::ConnectionPool; use sui_indexer::db::{ - check_db_migration_consistency, check_retention_policy_overrides_valid, reset_database, - run_migrations, + check_db_migration_consistency, check_prunable_tables_valid, reset_database, run_migrations, }; use sui_indexer::indexer::Indexer; use sui_indexer::metrics::{ @@ -48,17 +47,9 @@ async fn main() -> anyhow::Result<()> { } => { // Make sure to run all migrations on startup, and also serve as a compatibility check. run_migrations(pool.dedicated_connection().await?).await?; - let retention_policies = pruning_options.retention_policies(); - if let Some(retention_policies) = &retention_policies { - check_retention_policy_overrides_valid( - &mut pool.get().await?, - &retention_policies - .overrides - .iter() - .map(|(k, v)| (k.to_string(), *v)) - .collect(), - ) - .await?; + let retention_policies = pruning_options.load_from_file(); + if retention_policies.is_some() { + check_prunable_tables_valid(&mut pool.get().await?).await?; } let store = PgIndexerStore::new(pool, upload_options, indexer_metrics.clone()); diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 87e335b6325a5..454d1e08b7415 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -62,11 +62,11 @@ pub async fn start_indexer_jsonrpc_for_testing( } /// Wrapper over `Indexer::start_writer_with_config` to make it easier to configure an indexer -/// writer for testing. +/// writer for testing. ` pub async fn start_indexer_writer_for_testing( db_url: String, snapshot_config: Option, - pruning_options: Option, + retention_policies: Option, data_ingestion_path: Option, cancel: Option, ) -> ( @@ -76,7 +76,6 @@ pub async fn start_indexer_writer_for_testing( ) { let token = cancel.unwrap_or_else(|| CancellationToken::new()); let snapshot_config = snapshot_config.unwrap_or_default(); - let pruning_options = pruning_options.unwrap_or_default(); // Reduce the connection pool size to 10 for testing to prevent maxing out let pool_config = ConnectionPoolConfig { @@ -120,10 +119,10 @@ pub async fn start_indexer_writer_for_testing( indexer_metrics, snapshot_config, retention_policies, - cancel, + token_clone, ) .await - }); + }) }; (store, handle, token) diff --git a/crates/sui-transactional-test-runner/src/args.rs b/crates/sui-transactional-test-runner/src/args.rs index eb4749aa163b0..2d73a54ea99d9 100644 --- a/crates/sui-transactional-test-runner/src/args.rs +++ b/crates/sui-transactional-test-runner/src/args.rs @@ -1,10 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::str::FromStr; - use crate::test_adapter::{FakeID, SuiTestAdapter}; -use anyhow::{bail, ensure, Context}; +use anyhow::{bail, ensure}; use clap; use clap::{Args, Parser}; use move_command_line_common::parser::{parse_u256, parse_u64}; @@ -72,8 +70,6 @@ pub struct SuiInitArgs { /// the indexer. #[clap(long = "epochs-to-keep")] pub epochs_to_keep: Option, - #[clap(long = "retention-policy-overrides", value_parser = parse_key_val::)] - pub retention_policy_overrides: Option>, } #[derive(Debug, clap::Parser)] @@ -611,25 +607,3 @@ fn parse_policy(x: &str) -> anyhow::Result { _ => bail!("Invalid upgrade policy {x}. Policy must be one of 'compatible', 'additive', or 'dep_only'") }) } - -fn parse_key_val(s: &str) -> anyhow::Result<(T, U)> -where - T: FromStr, - T::Err: std::error::Error + Send + Sync + 'static, - U: FromStr, - U::Err: std::error::Error + Send + Sync + 'static, -{ - let pos = s - .find('=') - .with_context(|| format!("Invalid KEY=value: no `=` found in `{s}`"))?; - - let key = s[..pos] - .parse() - .with_context(|| format!("Failed to parse key from `{}`", &s[..pos]))?; - - let value = s[pos + 1..] - .parse() - .with_context(|| format!("Failed to parse value from `{}`", &s[pos + 1..]))?; - - Ok((key, value)) -} diff --git a/crates/sui-transactional-test-runner/src/test_adapter.rs b/crates/sui-transactional-test-runner/src/test_adapter.rs index da53af365dff2..00a7dd7803cf4 100644 --- a/crates/sui-transactional-test-runner/src/test_adapter.rs +++ b/crates/sui-transactional-test-runner/src/test_adapter.rs @@ -39,7 +39,6 @@ use move_transactional_test_runner::{ use move_vm_runtime::session::SerializedReturnValues; use once_cell::sync::Lazy; use rand::{rngs::StdRng, Rng, SeedableRng}; -use std::collections::HashMap; use std::fmt::{self, Write}; use std::time::Duration; use std::{ @@ -247,7 +246,6 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { snapshot_config, flavor, epochs_to_keep, - retention_policy_overrides, ) = match task_opt.map(|t| t.command) { Some(( InitCommand { named_addresses }, @@ -263,7 +261,6 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { snapshot_config, flavor, epochs_to_keep, - retention_policy_overrides, }, )) => { let map = verify_and_create_named_address_mapping(named_addresses).unwrap(); @@ -303,7 +300,6 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { snapshot_config, flavor, epochs_to_keep, - retention_policy_overrides, ) } None => { @@ -319,7 +315,6 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { SnapshotLagConfig::default(), None, None, - None, ) } }; @@ -335,22 +330,10 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { }, cluster, ) = if is_simulator { - // map retention_policy_overrides from Vec<(String, u64)> to a HashMap - let retention_policy_overrides = retention_policy_overrides - .map(|overrides| overrides.into_iter().collect::>()); - - let retention_policies = match (epochs_to_keep, retention_policy_overrides) { - (Some(default_retention), Some(overrides)) => { - Some(RetentionPolicies::new(default_retention, overrides)) - } - (Some(epochs_to_keep), None) => Some( - RetentionPolicies::new_with_default_retention_only(epochs_to_keep), - ), - (None, Some(overrides)) => { - Some(RetentionPolicies::new_with_overrides_only(overrides)) - } - (None, None) => None, - }; + // TODO: (wlmyng) as of right now, we can't test per-table overrides until the pruner is + // updated + let retention_policies = + epochs_to_keep.map(RetentionPolicies::new_with_default_retention_only_for_testing); init_sim_executor( rng,