Skip to content

Commit

Permalink
cleanup instantiation
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Oct 2, 2024
1 parent 517131a commit 5065a0a
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 187 deletions.
3 changes: 1 addition & 2 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_indexer::config::PruningOptions;
pub use sui_indexer::config::RetentionPolicies;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
Expand Down Expand Up @@ -184,7 +183,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
Some(snapshot_config.clone()),
Some(PruningOptions { epochs_to_keep }),
retention_policies,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
)
Expand Down
176 changes: 100 additions & 76 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use strum::IntoEnumIterator;
use sui_json_rpc::name_service::NameServiceConfig;
Expand All @@ -15,7 +14,6 @@ use url::Url;
/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;
const DEFAULT_EPOCHS_TO_KEEP: u64 = 30;

#[derive(Parser, Clone, Debug)]
#[clap(
Expand Down Expand Up @@ -221,66 +219,87 @@ pub struct PruningOptions {
pub pruning_config_path: Option<PathBuf>,
}

/// Default retention policies and overrides for the pruner. Instantiated only if `PruningOptions`
/// is provided on indexer start. Any tables not named in the file will inherit the default
/// retention policy `epochs_to_keep`.
/// Represents the default retention policy and overrides for prunable tables. When `finalize` is
/// called, the `policies` field is updated with the default retention policy for all tables that do
/// not have an override specified. Instantiated only if `PruningOptions` is provided on indexer
/// start.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicies {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// Mapping of retention policies for specific tables which will override the default policy
/// specified by `epochs_to_keep`.
#[serde(default)]
pub overrides: HashMap<PrunableTable, u64>,
/// In Rust, a mapping of all `PrunableTable` variants to their respective retention policies.
/// From a TOML file, a user only needs to explicitly list tables that should override the
/// default retention.
#[serde(default, rename = "overrides")]
pub policies: HashMap<PrunableTable, u64>,
}

impl RetentionPolicies {
pub fn new(epochs_to_keep: u64, overrides: HashMap<String, u64>) -> Self {
let overrides = overrides
.into_iter()
.filter_map(|(table_name, retention)| {
PrunableTable::from_str(&table_name)
.ok()
.map(|table| (table, retention))
})
.collect();
impl PruningOptions {
/// Load default retention policy and overrides from file.
pub fn load_from_file(&self) -> Option<RetentionPolicies> {
let Some(config_path) = self.pruning_config_path.as_ref() else {
return None;
};

let contents = std::fs::read_to_string(&config_path)
.expect("Failed to read default retention policy and overrides from file");
let retention_with_overrides = toml::de::from_str::<RetentionPolicies>(&contents)
.expect("Failed to parse into RetentionPolicies struct");

let default_retention = retention_with_overrides.epochs_to_keep;

assert!(
default_retention > 0,
"Default retention must be greater than 0"
);
assert!(
retention_with_overrides
.policies
.values()
.all(|&policy| policy > 0),
"All retention overrides must be greater than 0"
);

Some(retention_with_overrides)
}
}

impl RetentionPolicies {
/// Create a new `RetentionPolicies` with the specified default retention and overrides. Call
/// `finalize()` on the instance to update the `policies` field with the default retention
/// policy for all tables that do not have an override specified.
pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
Self {
epochs_to_keep,
overrides,
policies: overrides,
}
}

/// Create a new `RetentionPolicies` with only the default retention specified and the default
/// override for `objects_history`.
pub fn new_with_default_retention_only(epochs_to_keep: u64) -> Self {
/// override for `objects_history`. Call `finalize()` on the instance to update the `policies`
/// field with the default retention policy for all tables that do not have an override
/// specified.
pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self {
epochs_to_keep,
overrides,
}
Self::new(epochs_to_keep, overrides)
}

/// Create a new `RetentionPolicies` with only the overrides specified and a default retention
/// of 30 days.
pub fn new_with_overrides_only(overrides: HashMap<String, u64>) -> Self {
let overrides = overrides
.into_iter()
.filter_map(|(table_name, retention)| {
PrunableTable::from_str(&table_name)
.ok()
.map(|table| (table, retention))
})
.collect();
Self {
overrides,
epochs_to_keep: DEFAULT_EPOCHS_TO_KEEP,
/// Updates the `policies` field with the default retention policy for all tables that do not
/// have an override specified.
pub fn finalize(mut self) -> Self {
for table in PrunableTable::iter() {
self.policies.entry(table).or_insert(self.epochs_to_keep);
}
self
}

pub fn get(&self, table: &PrunableTable) -> Option<u64> {
self.policies.get(table).copied()
}
}

Expand Down Expand Up @@ -359,42 +378,12 @@ impl Default for RestoreConfig {
}
}

/// Load default retention policy and overrides from file, and then compile into a map that sets a
/// retention policy for each pruanble table.
pub fn finalize_retention_policies(config_path: PathBuf) -> HashMap<PrunableTable, u64> {
let contents = std::fs::read_to_string(&config_path)
.expect("Failed to read default retention policy and overrides from file");
let policies = toml::de::from_str::<RetentionPolicies>(&contents)
.expect("Failed to parse into RetentionPolicies struct");

assert!(
policies.epochs_to_keep > 0,
"Default retention must be greater than 0"
);
assert!(
policies.overrides.values().all(|&retention| retention > 0),
"All retention overrides must be greater than 0"
);

let mut merged_policies = HashMap::new();

for table in PrunableTable::iter() {
let retention = policies
.overrides
.get(&table)
.copied()
.unwrap_or(policies.epochs_to_keep);

merged_policies.insert(table, retention);
}

merged_policies
}

#[cfg(test)]
mod test {
use super::*;
use std::io::Write;
use tap::Pipe;
use tempfile::NamedTempFile;

fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
where
Expand Down Expand Up @@ -471,12 +460,47 @@ mod test {

let opts = toml::de::from_str::<RetentionPolicies>(toml_str).unwrap();
assert_eq!(opts.epochs_to_keep, 5);
assert_eq!(opts.policies.get(&PrunableTable::ObjectsHistory), Some(&10));
assert_eq!(opts.policies.get(&PrunableTable::Transactions), Some(&20));
assert_eq!(opts.policies.len(), 2);
}

#[test]
fn test_pruning_options_from_file() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let mut retention_policies = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_policies.epochs_to_keep, 5);
assert_eq!(
retention_policies.get(&PrunableTable::ObjectsHistory),
Some(10)
);
assert_eq!(
opts.overrides.get(&PrunableTable::ObjectsHistory),
Some(&10)
retention_policies.get(&PrunableTable::Transactions),
Some(20)
);
assert_eq!(retention_policies.policies.len(), 2);

retention_policies = retention_policies.finalize();

assert!(
retention_policies.policies.len() > 2,
"Expected more than 2 policies, but got {}",
retention_policies.policies.len()
);
assert_eq!(opts.overrides.get(&PrunableTable::Transactions), Some(&20));
assert_eq!(opts.overrides.len(), 2);
}

#[test]
Expand Down
19 changes: 8 additions & 11 deletions crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@

use crate::database::Connection;
use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use clap::Args;
use diesel::migration::{Migration, MigrationSource, MigrationVersion};
use diesel::pg::Pg;
use diesel::prelude::QueryableByName;
use diesel::table;
use diesel::QueryDsl;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeSet, HashSet};
use std::time::Duration;
use strum::IntoEnumIterator;
use tracing::info;

table! {
Expand Down Expand Up @@ -135,11 +137,8 @@ async fn check_db_migration_consistency_impl(
)))
}

/// Checks that the tables named in `RetentionPolicies.overrides`, if any, actually exist in the db.
pub async fn check_retention_policy_overrides_valid(
conn: &mut Connection<'_>,
overrides: &HashMap<String, u64>,
) -> Result<(), IndexerError> {
/// Check that prunable tables exist in the database.
pub async fn check_prunable_tables_valid(conn: &mut Connection<'_>) -> Result<(), IndexerError> {
info!("Starting compatibility check");

use diesel_async::RunQueryDsl;
Expand Down Expand Up @@ -172,12 +171,10 @@ pub async fn check_retention_policy_overrides_valid(
.await
.map_err(|e| IndexerError::DbMigrationError(format!("Failed to fetch tables: {e}")))?;

// Check that each key in overrides exists in result
let map_keys: HashSet<&String> = overrides.keys().collect();
let existing_tables: HashSet<_> = result.into_iter().map(|t| t.table_name).collect();
let parent_tables_from_db: HashSet<_> = result.into_iter().map(|t| t.table_name).collect();

for key in map_keys {
if !existing_tables.contains(key) {
for key in PrunableTable::iter() {
if !parent_tables_from_db.contains(key.as_ref()) {
return Err(IndexerError::GenericError(format!(
"Invalid retention policy override provided for table {}: does not exist in the database",
key
Expand Down
34 changes: 8 additions & 26 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,14 @@ use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use strum::IntoEnumIterator;
use strum_macros;
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

pub struct Pruner {
pub store: PgIndexerStore,
pub partition_manager: PgPartitionManager,
// TODO: (wlmyng) we can remove this once pruner consults `retention_policies` for all tables to
// prune.
pub default_retention: u64,
/// Retention policies for each prunable table. If a table is not listed here, it will not be
/// pruned.
pub retention_policies: HashMap<PrunableTable, u64>,
pub retention_policies: RetentionPolicies,
pub metrics: IndexerMetrics,
}

Expand Down Expand Up @@ -73,32 +67,19 @@ pub enum PrunableTable {
}

impl Pruner {
/// Instantiates a pruner with default retention and overrides. Pruner will merge these into a
/// list of policies for each prunable table.
/// Instantiates a pruner with default retention and overrides. Pruner will finalize the
/// retention policies so there is a value for every prunable table.
pub fn new(
store: PgIndexerStore,
retention_policies: RetentionPolicies,
metrics: IndexerMetrics,
) -> Result<Self, IndexerError> {
let partition_manager = PgPartitionManager::new(store.pool())?;
let mut merged_policies = HashMap::new();

// Iterate through the prunable table variants
for table in PrunableTable::iter() {
let retention = retention_policies
.overrides
.get(&table)
.copied()
.unwrap_or(retention_policies.epochs_to_keep);

merged_policies.insert(table, retention);
}

Ok(Self {
store,
partition_manager,
default_retention: retention_policies.epochs_to_keep,
retention_policies: merged_policies,
retention_policies: retention_policies.finalize(),
metrics,
})
}
Expand All @@ -107,7 +88,7 @@ impl Pruner {
/// table is not prunable.
fn epochs_to_keep(&self, table_name: &str) -> Option<u64> {
if let Ok(variant) = table_name.parse::<PrunableTable>() {
self.retention_policies.get(&variant).copied()
self.retention_policies.get(&variant)
} else {
None
}
Expand Down Expand Up @@ -168,8 +149,9 @@ impl Pruner {
// TODO: (wlmyng) Once we have the watermarks table, we can iterate through each row
// returned from `watermarks`, look it up against `retention_policies`, and process them
// independently. This also means that pruning overrides will only apply for
// epoch-partitioned tables.
let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.default_retention - 1);
// epoch-partitioned tables right now.
let prune_to_epoch =
last_seen_max_epoch.saturating_sub(self.retention_policies.epochs_to_keep - 1);
let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch);
for epoch in prune_start_epoch..prune_to_epoch {
if cancel.is_cancelled() {
Expand Down
7 changes: 1 addition & 6 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,7 @@ impl Indexer {
.await?;

if let Some(retention_policies) = retention_policies {
info!(
"Starting indexer pruner with epochs to keep: {}",
retention_policies.epochs_to_keep
);
assert!(epochs_to_keep > 0, "Epochs to keep must be positive");
let pruner = Pruner::new(store.clone(), epochs_to_keep, metrics.clone())?;
let pruner = Pruner::new(store.clone(), retention_policies, metrics.clone())?;
let cancel_clone = cancel.clone();
spawn_monitored_task!(pruner.start(cancel_clone));
}
Expand Down
Loading

0 comments on commit 5065a0a

Please sign in to comment.