diff --git a/Cargo.lock b/Cargo.lock index 423e16e677663..75f4cd94ef67a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13659,6 +13659,8 @@ dependencies = [ "serde_json", "serde_with 3.9.0", "simulacrum", + "strum 0.24.1", + "strum_macros 0.24.3", "sui-archival", "sui-config", "sui-core", @@ -13687,6 +13689,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)", + "toml 0.7.4", "tracing", "url", ] diff --git a/crates/sui-graphql-rpc/src/test_infra/cluster.rs b/crates/sui-graphql-rpc/src/test_infra/cluster.rs index 64f4f74c2ed42..f7a92a406ebc5 100644 --- a/crates/sui-graphql-rpc/src/test_infra/cluster.rs +++ b/crates/sui-graphql-rpc/src/test_infra/cluster.rs @@ -14,6 +14,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use sui_graphql_rpc_client::simple_client::SimpleClient; +pub use sui_indexer::config::RetentionPolicies; pub use sui_indexer::config::SnapshotLagConfig; use sui_indexer::errors::IndexerError; use sui_indexer::store::indexer_store::IndexerStore; @@ -151,7 +152,7 @@ pub async fn start_network_cluster() -> NetworkCluster { pub async fn serve_executor( executor: Arc, snapshot_config: Option, - epochs_to_keep: Option, + retention_policies: Option, data_ingestion_path: PathBuf, ) -> ExecutorCluster { let database = TempDb::new().unwrap(); @@ -181,7 +182,7 @@ pub async fn serve_executor( let (pg_store, pg_handle) = start_test_indexer_impl( db_url, format!("http://{}", executor_server_url), - ReaderWriterConfig::writer_mode(snapshot_config.clone(), epochs_to_keep), + ReaderWriterConfig::writer_mode(snapshot_config.clone(), retention_policies), Some(data_ingestion_path), cancellation_token.clone(), ) diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 0ec76e6b2ff9a..1f4f76f415a3e 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -31,11 +31,14 @@ regex.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true +strum.workspace = true +strum_macros.workspace = true tap.workspace = true tempfile.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true, features = ["rt"] } +toml.workspace = true tracing.workspace = true url.workspace = true diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index 9e42c57da65ea..f934bbf2b6533 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -1,14 +1,21 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::backfill::BackfillTaskKind; use crate::db::ConnectionPoolConfig; +use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable}; use clap::{Args, Parser, Subcommand}; -use std::{net::SocketAddr, path::PathBuf}; +use serde::{Deserialize, Serialize}; +use std::str::FromStr; +use std::{collections::HashMap, net::SocketAddr, path::PathBuf}; use sui_json_rpc::name_service::NameServiceConfig; use sui_types::base_types::{ObjectID, SuiAddress}; use url::Url; +/// The primary purpose of objects_history is to serve consistency query. +/// A short retention is sufficient. +const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2; +const DEFAULT_EPOCHS_TO_KEEP: u64 = 30; + #[derive(Parser, Clone, Debug)] #[clap( name = "Sui indexer", @@ -208,8 +215,97 @@ pub enum Command { #[derive(Args, Default, Debug, Clone)] pub struct PruningOptions { - #[arg(long, env = "EPOCHS_TO_KEEP")] - pub epochs_to_keep: Option, + /// Path to TOML file containing configuration for retention policies. + #[arg(long)] + pub config: Option, +} + +/// Default retention policies and overrides for the pruner. Instantiated only if `PruningOptions` +/// is provided on indexer start. Any tables not named in the file will inherit the default +/// retention policy `epochs_to_keep`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RetentionPolicies { + /// Default retention policy for all tables. + pub epochs_to_keep: u64, + /// Mapping of retention policies for specific tables which will override the default policy + /// specified by `epochs_to_keep`. + #[serde(default)] + pub overrides: HashMap, +} + +impl RetentionPolicies { + pub fn new(epochs_to_keep: u64, overrides: HashMap) -> Self { + let overrides = overrides + .into_iter() + .filter_map(|(table_name, retention)| { + PrunableTable::from_str(&table_name) + .ok() + .map(|table| (table, retention)) + }) + .collect(); + + Self { + epochs_to_keep, + overrides, + } + } + + /// Create a new `RetentionPolicies` with only the default retention specified and the default + /// override for `objects_history`. + pub fn new_with_default_retention_only(epochs_to_keep: u64) -> Self { + let mut overrides = HashMap::new(); + overrides.insert( + PrunableTable::ObjectsHistory, + OBJECTS_HISTORY_EPOCHS_TO_KEEP, + ); + + Self { + epochs_to_keep, + overrides, + } + } + + /// Create a new `RetentionPolicies` with only the overrides specified and a default retention + /// of 30 days. + pub fn new_with_overrides_only(overrides: HashMap) -> Self { + let overrides = overrides + .into_iter() + .filter_map(|(table_name, retention)| { + PrunableTable::from_str(&table_name) + .ok() + .map(|table| (table, retention)) + }) + .collect(); + Self { + overrides, + epochs_to_keep: DEFAULT_EPOCHS_TO_KEEP, + } + } +} + +impl PruningOptions { + /// If a path to the retention policies config has been provided, attempt to parse and return + /// it. Otherwise, pruning is not enabled for the indexer, and return None. + pub fn retention_policies(self) -> Option { + let Some(path) = self.config else { + return None; + }; + + let contents = + std::fs::read_to_string(&path).expect("Failed to read retention policies file"); + let policies = toml::de::from_str::(&contents) + .expect("Failed to parse into RetentionPolicies struct"); + + assert!( + policies.epochs_to_keep > 0, + "Default retention must be greater than 0" + ); + assert!( + policies.overrides.values().all(|&retention| retention > 0), + "All retention overrides must be greater than 0" + ); + Some(policies) + } } #[derive(Args, Debug, Clone)] @@ -354,4 +450,46 @@ mod test { // fullnode rpc url must be present parse_args::([]).unwrap_err(); } + + #[test] + fn test_valid_pruning_config_file() { + let toml_str = r#" + epochs_to_keep = 5 + + [overrides] + objects_history = 10 + transactions = 20 + "#; + + let opts = toml::de::from_str::(toml_str).unwrap(); + assert_eq!(opts.epochs_to_keep, 5); + assert_eq!( + opts.overrides.get(&PrunableTable::ObjectsHistory), + Some(&10) + ); + assert_eq!(opts.overrides.get(&PrunableTable::Transactions), Some(&20)); + assert_eq!(opts.overrides.len(), 2); + } + + #[test] + fn test_invalid_pruning_config_file() { + let toml_str = r#" + epochs_to_keep = 5 + + [overrides] + objects_history = 10 + transactions = 20 + invalid_table = 30 + "#; + + let result = toml::from_str::(toml_str); + assert!(result.is_err(), "Expected an error, but parsing succeeded"); + + if let Err(e) = result { + assert!( + e.to_string().contains("unknown variant `invalid_table`"), + "Error message doesn't mention the invalid table" + ); + } + } } diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index 1abd5d31a956e..191c09d5e0f0d 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -6,10 +6,11 @@ use crate::errors::IndexerError; use clap::Args; use diesel::migration::{Migration, MigrationSource, MigrationVersion}; use diesel::pg::Pg; +use diesel::prelude::QueryableByName; use diesel::table; use diesel::QueryDsl; use diesel_migrations::{embed_migrations, EmbeddedMigrations}; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::time::Duration; use tracing::info; @@ -134,6 +135,60 @@ async fn check_db_migration_consistency_impl( ))) } +/// Checks that the tables named in `RetentionPolicies.overrides`, if any, actually exist in the db. +pub async fn check_retention_policy_overrides_valid( + conn: &mut Connection<'_>, + overrides: &HashMap, +) -> Result<(), IndexerError> { + info!("Starting compatibility check"); + + use diesel_async::RunQueryDsl; + + let select_parent_tables = r#" + SELECT c.relname AS table_name + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_partitioned_table pt ON pt.partrelid = c.oid + WHERE c.relkind IN ('r', 'p') -- 'r' for regular tables, 'p' for partitioned tables + AND n.nspname = 'public' + AND ( + pt.partrelid IS NOT NULL -- This is a partitioned (parent) table + OR NOT EXISTS ( -- This is not a partition (child table) + SELECT 1 + FROM pg_inherits i + WHERE i.inhrelid = c.oid + ) + ); + "#; + + #[derive(QueryableByName)] + struct TableName { + #[diesel(sql_type = diesel::sql_types::Text)] + table_name: String, + } + + let result: Vec = diesel::sql_query(select_parent_tables) + .load(conn) + .await + .map_err(|e| IndexerError::DbMigrationError(format!("Failed to fetch tables: {e}")))?; + + // Check that each key in overrides exists in result + let map_keys: HashSet<&String> = overrides.keys().collect(); + let existing_tables: HashSet<_> = result.into_iter().map(|t| t.table_name).collect(); + + for key in map_keys { + if !existing_tables.contains(key) { + return Err(IndexerError::GenericError(format!( + "Invalid retention policy override provided for table {}: does not exist in the database", + key + ))); + } + } + + info!("Compatibility check passed"); + Ok(()) +} + pub use setup_postgres::{reset_database, run_migrations}; pub mod setup_postgres { diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index cdbe274376f46..6ad4664f664cd 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -1,42 +1,118 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; -use std::time::Duration; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - +use crate::config::RetentionPolicies; use crate::errors::IndexerError; use crate::store::pg_partition_manager::PgPartitionManager; use crate::store::PgIndexerStore; use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult}; - -/// The primary purpose of objects_history is to serve consistency query. -/// A short retention is sufficient. -const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::Duration; +use strum::IntoEnumIterator; +use strum_macros; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; pub struct Pruner { pub store: PgIndexerStore, pub partition_manager: PgPartitionManager, - pub epochs_to_keep: u64, + // TODO: (wlmyng) we can remove this once pruner consults `retention_policies` for all tables to + // prune. + pub default_retention: u64, + /// Retention policies for each prunable table. If a table is not listed here, it will not be + /// pruned. + pub retention_policies: HashMap, pub metrics: IndexerMetrics, } +/// Enum representing tables that the pruner is allowed to prune. The pruner will ignore any table +/// that is not listed here. +#[derive( + Debug, + Eq, + PartialEq, + strum_macros::Display, + strum_macros::EnumString, + strum_macros::EnumIter, + strum_macros::AsRefStr, + Hash, + Serialize, + Deserialize, + Clone, +)] +#[strum(serialize_all = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum PrunableTable { + ObjectsHistory, + Transactions, + Events, + + EventEmitPackage, + EventEmitModule, + EventSenders, + EventStructInstantiation, + EventStructModule, + EventStructName, + EventStructPackage, + + TxAffectedAddresses, + TxAffectedObjects, + TxCallsPkg, + TxCallsMod, + TxCallsFun, + TxChangedObjects, + TxDigests, + TxInputObjects, + TxKinds, + TxRecipients, + TxSenders, + + Checkpoints, + PrunerCpWatermark, +} + impl Pruner { + /// Instantiates a pruner with default retention and overrides. Pruner will merge these into a + /// list of policies for each prunable table. pub fn new( store: PgIndexerStore, - epochs_to_keep: u64, + retention_policies: RetentionPolicies, metrics: IndexerMetrics, ) -> Result { let partition_manager = PgPartitionManager::new(store.pool())?; + let mut merged_policies = HashMap::new(); + + // Iterate through the prunable table variants + for table in PrunableTable::iter() { + let retention = retention_policies + .overrides + .get(&table) + .copied() + .unwrap_or(retention_policies.epochs_to_keep); + + merged_policies.insert(table, retention); + } + Ok(Self { store, partition_manager, - epochs_to_keep, + default_retention: retention_policies.epochs_to_keep, + retention_policies: merged_policies, metrics, }) } + /// Given a table name, return the number of epochs to keep for that table. Return `None` if the + /// table is not prunable. + fn epochs_to_keep(&self, table_name: &str) -> Option { + if let Ok(variant) = table_name.parse::() { + self.retention_policies.get(&variant).copied() + } else { + None + } + } + pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> { let mut last_seen_max_epoch = 0; // The first epoch that has not yet been pruned. @@ -63,35 +139,37 @@ impl Pruner { .collect(); for (table_name, (min_partition, max_partition)) in &table_partitions { - if last_seen_max_epoch != *max_partition { - error!( - "Epochs are out of sync for table {}: max_epoch={}, max_partition={}", - table_name, last_seen_max_epoch, max_partition - ); - } + if let Some(epochs_to_keep) = self.epochs_to_keep(table_name) { + if last_seen_max_epoch != *max_partition { + error!( + "Epochs are out of sync for table {}: max_epoch={}, max_partition={}", + table_name, last_seen_max_epoch, max_partition + ); + } - let epochs_to_keep = if table_name == "objects_history" { - OBJECTS_HISTORY_EPOCHS_TO_KEEP - } else { - self.epochs_to_keep - }; - for epoch in *min_partition..last_seen_max_epoch.saturating_sub(epochs_to_keep - 1) - { - if cancel.is_cancelled() { - info!("Pruner task cancelled."); - return Ok(()); + for epoch in + *min_partition..last_seen_max_epoch.saturating_sub(epochs_to_keep - 1) + { + if cancel.is_cancelled() { + info!("Pruner task cancelled."); + return Ok(()); + } + self.partition_manager + .drop_table_partition(table_name.clone(), epoch) + .await?; + info!( + "Batch dropped table partition {} epoch {}", + table_name, epoch + ); } - self.partition_manager - .drop_table_partition(table_name.clone(), epoch) - .await?; - info!( - "Batch dropped table partition {} epoch {}", - table_name, epoch - ); } } - let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1); + // TODO: (wlmyng) Once we have the watermarks table, we can iterate through each row + // returned from `watermarks`, look it up against `retention_policies`, and process them + // independently. This also means that pruning overrides will only apply for + // epoch-partitioned tables. + let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.default_retention - 1); let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch); for epoch in prune_start_epoch..prune_to_epoch { if cancel.is_cancelled() { diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index 6bdf400758acc..a7f10d414dde1 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -19,7 +19,7 @@ use sui_data_ingestion_core::{ use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::build_json_rpc_server; -use crate::config::{IngestionConfig, JsonRpcConfig, PruningOptions, SnapshotLagConfig}; +use crate::config::{IngestionConfig, JsonRpcConfig, RetentionPolicies, SnapshotLagConfig}; use crate::database::ConnectionPool; use crate::errors::IndexerError; use crate::handlers::checkpoint_handler::new_handlers; @@ -44,7 +44,7 @@ impl Indexer { store, metrics, snapshot_config, - PruningOptions::default(), + None, /* retention_policies */ CancellationToken::new(), ) .await @@ -55,7 +55,7 @@ impl Indexer { store: PgIndexerStore, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, - pruning_options: PruningOptions, + retention_policies: Option, cancel: CancellationToken, ) -> Result<(), IndexerError> { info!( @@ -88,13 +88,17 @@ impl Indexer { ) .await?; - if let Some(epochs_to_keep) = pruning_options.epochs_to_keep { + if let Some(retention_policies) = retention_policies { info!( "Starting indexer pruner with epochs to keep: {}", - epochs_to_keep + retention_policies.epochs_to_keep ); - assert!(epochs_to_keep > 0, "Epochs to keep must be positive"); - let pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone())?; + assert!( + retention_policies.epochs_to_keep > 0, + "Epochs to keep must be positive" + ); + + let pruner = Pruner::new(store.clone(), retention_policies, metrics.clone())?; spawn_monitored_task!(pruner.start(CancellationToken::new())); } diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index c23b1d74f37ce..2d9f11262c1ba 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -5,7 +5,10 @@ use clap::Parser; use sui_indexer::backfill::backfill_runner::BackfillRunner; use sui_indexer::config::{Command, UploadOptions}; use sui_indexer::database::ConnectionPool; -use sui_indexer::db::{check_db_migration_consistency, reset_database, run_migrations}; +use sui_indexer::db::{ + check_db_migration_consistency, check_retention_policy_overrides_valid, reset_database, + run_migrations, +}; use sui_indexer::indexer::Indexer; use sui_indexer::metrics::{ spawn_connection_pool_metric_collector, start_prometheus_server, IndexerMetrics, @@ -45,6 +48,19 @@ async fn main() -> anyhow::Result<()> { } => { // Make sure to run all migrations on startup, and also serve as a compatibility check. run_migrations(pool.dedicated_connection().await?).await?; + let retention_policies = pruning_options.retention_policies(); + if let Some(retention_policies) = &retention_policies { + check_retention_policy_overrides_valid( + &mut pool.get().await?, + &retention_policies + .overrides + .iter() + .map(|(k, v)| (k.to_string(), *v)) + .collect(), + ) + .await?; + } + let store = PgIndexerStore::new(pool, upload_options, indexer_metrics.clone()); Indexer::start_writer_with_config( @@ -52,7 +68,7 @@ async fn main() -> anyhow::Result<()> { store, indexer_metrics, snapshot_config, - pruning_options, + retention_policies, CancellationToken::new(), ) .await?; diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 98d4d3a04aff9..050bbb121a04e 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::time::Duration; use sui_json_rpc_types::SuiTransactionBlockResponse; -use crate::config::{IngestionConfig, PruningOptions, SnapshotLagConfig, UploadOptions}; +use crate::config::{IngestionConfig, RetentionPolicies, SnapshotLagConfig, UploadOptions}; use crate::database::Connection; use crate::database::ConnectionPool; use crate::db::ConnectionPoolConfig; @@ -24,7 +24,7 @@ pub enum ReaderWriterConfig { }, Writer { snapshot_config: SnapshotLagConfig, - pruning_options: PruningOptions, + retention_policies: Option, }, } @@ -39,11 +39,11 @@ impl ReaderWriterConfig { /// to keep. pub fn writer_mode( snapshot_config: Option, - epochs_to_keep: Option, + retention_policies: Option, ) -> Self { Self::Writer { snapshot_config: snapshot_config.unwrap_or_default(), - pruning_options: PruningOptions { epochs_to_keep }, + retention_policies, } } } @@ -118,7 +118,7 @@ pub async fn start_test_indexer_impl( } ReaderWriterConfig::Writer { snapshot_config, - pruning_options, + retention_policies, } => { let connection = Connection::dedicated(&db_url.parse().unwrap()) .await @@ -135,7 +135,7 @@ pub async fn start_test_indexer_impl( store_clone, indexer_metrics, snapshot_config, - pruning_options, + retention_policies, cancel, ) .await diff --git a/crates/sui-transactional-test-runner/src/args.rs b/crates/sui-transactional-test-runner/src/args.rs index 2d73a54ea99d9..eb4749aa163b0 100644 --- a/crates/sui-transactional-test-runner/src/args.rs +++ b/crates/sui-transactional-test-runner/src/args.rs @@ -1,8 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::str::FromStr; + use crate::test_adapter::{FakeID, SuiTestAdapter}; -use anyhow::{bail, ensure}; +use anyhow::{bail, ensure, Context}; use clap; use clap::{Args, Parser}; use move_command_line_common::parser::{parse_u256, parse_u64}; @@ -70,6 +72,8 @@ pub struct SuiInitArgs { /// the indexer. #[clap(long = "epochs-to-keep")] pub epochs_to_keep: Option, + #[clap(long = "retention-policy-overrides", value_parser = parse_key_val::)] + pub retention_policy_overrides: Option>, } #[derive(Debug, clap::Parser)] @@ -607,3 +611,25 @@ fn parse_policy(x: &str) -> anyhow::Result { _ => bail!("Invalid upgrade policy {x}. Policy must be one of 'compatible', 'additive', or 'dep_only'") }) } + +fn parse_key_val(s: &str) -> anyhow::Result<(T, U)> +where + T: FromStr, + T::Err: std::error::Error + Send + Sync + 'static, + U: FromStr, + U::Err: std::error::Error + Send + Sync + 'static, +{ + let pos = s + .find('=') + .with_context(|| format!("Invalid KEY=value: no `=` found in `{s}`"))?; + + let key = s[..pos] + .parse() + .with_context(|| format!("Failed to parse key from `{}`", &s[..pos]))?; + + let value = s[pos + 1..] + .parse() + .with_context(|| format!("Failed to parse value from `{}`", &s[pos + 1..]))?; + + Ok((key, value)) +} diff --git a/crates/sui-transactional-test-runner/src/test_adapter.rs b/crates/sui-transactional-test-runner/src/test_adapter.rs index 7b5bd91626593..da53af365dff2 100644 --- a/crates/sui-transactional-test-runner/src/test_adapter.rs +++ b/crates/sui-transactional-test-runner/src/test_adapter.rs @@ -39,6 +39,7 @@ use move_transactional_test_runner::{ use move_vm_runtime::session::SerializedReturnValues; use once_cell::sync::Lazy; use rand::{rngs::StdRng, Rng, SeedableRng}; +use std::collections::HashMap; use std::fmt::{self, Write}; use std::time::Duration; use std::{ @@ -50,7 +51,7 @@ use sui_core::authority::test_authority_builder::TestAuthorityBuilder; use sui_core::authority::AuthorityState; use sui_framework::DEFAULT_FRAMEWORK_PATH; use sui_graphql_rpc::test_infra::cluster::ExecutorCluster; -use sui_graphql_rpc::test_infra::cluster::{serve_executor, SnapshotLagConfig}; +use sui_graphql_rpc::test_infra::cluster::{serve_executor, RetentionPolicies, SnapshotLagConfig}; use sui_json_rpc_api::QUERY_MAX_RESULT_LIMIT; use sui_json_rpc_types::{DevInspectResults, SuiExecutionStatus, SuiTransactionBlockEffectsAPI}; use sui_protocol_config::{Chain, ProtocolConfig}; @@ -246,6 +247,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { snapshot_config, flavor, epochs_to_keep, + retention_policy_overrides, ) = match task_opt.map(|t| t.command) { Some(( InitCommand { named_addresses }, @@ -261,6 +263,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { snapshot_config, flavor, epochs_to_keep, + retention_policy_overrides, }, )) => { let map = verify_and_create_named_address_mapping(named_addresses).unwrap(); @@ -300,6 +303,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { snapshot_config, flavor, epochs_to_keep, + retention_policy_overrides, ) } None => { @@ -315,6 +319,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { SnapshotLagConfig::default(), None, None, + None, ) } }; @@ -330,6 +335,23 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { }, cluster, ) = if is_simulator { + // map retention_policy_overrides from Vec<(String, u64)> to a HashMap + let retention_policy_overrides = retention_policy_overrides + .map(|overrides| overrides.into_iter().collect::>()); + + let retention_policies = match (epochs_to_keep, retention_policy_overrides) { + (Some(default_retention), Some(overrides)) => { + Some(RetentionPolicies::new(default_retention, overrides)) + } + (Some(epochs_to_keep), None) => Some( + RetentionPolicies::new_with_default_retention_only(epochs_to_keep), + ), + (None, Some(overrides)) => { + Some(RetentionPolicies::new_with_overrides_only(overrides)) + } + (None, None) => None, + }; + init_sim_executor( rng, account_names, @@ -338,7 +360,7 @@ impl<'a> MoveTestAdapter<'a> for SuiTestAdapter { custom_validator_account, reference_gas_price, snapshot_config, - epochs_to_keep, + retention_policies, ) .await } else { @@ -2116,7 +2138,7 @@ async fn init_sim_executor( custom_validator_account: bool, reference_gas_price: Option, snapshot_config: SnapshotLagConfig, - epochs_to_keep: Option, + retention_policies: Option, ) -> ( Box, AccountSetup, @@ -2188,7 +2210,7 @@ async fn init_sim_executor( let cluster = serve_executor( Arc::new(read_replica), Some(snapshot_config), - epochs_to_keep, + retention_policies, data_ingestion_path, ) .await;