Skip to content

Commit

Permalink
PruningOptions -> RetentionPolicies
Browse files Browse the repository at this point in the history
make life easier, requires epochs_to_keep with optional overrides. otherwise, no retention policy at all

strumming the pruning config. pruner.rs file will define prunable tables. the toml file when parsed into Rust config will warn if it tries to name any variants not supported by indexer code. additionally, there's also a check to make sure that prunable tables actually exist in the db
  • Loading branch information
wlmyng committed Oct 2, 2024
1 parent b22cb24 commit 5cdcd79
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 63 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
pub use sui_indexer::config::RetentionPolicies;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::indexer_store::IndexerStore;
Expand Down Expand Up @@ -151,7 +152,7 @@ pub async fn start_network_cluster() -> NetworkCluster {
pub async fn serve_executor(
executor: Arc<dyn RestStateReader + Send + Sync>,
snapshot_config: Option<SnapshotLagConfig>,
epochs_to_keep: Option<u64>,
retention_policies: Option<RetentionPolicies>,
data_ingestion_path: PathBuf,
) -> ExecutorCluster {
let database = TempDb::new().unwrap();
Expand Down Expand Up @@ -181,7 +182,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle) = start_test_indexer_impl(
db_url,
format!("http://{}", executor_server_url),
ReaderWriterConfig::writer_mode(snapshot_config.clone(), epochs_to_keep),
ReaderWriterConfig::writer_mode(snapshot_config.clone(), retention_policies),
Some(data_ingestion_path),
cancellation_token.clone(),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
146 changes: 142 additions & 4 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::BackfillTaskKind;
use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use std::{net::SocketAddr, path::PathBuf};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use sui_json_rpc::name_service::NameServiceConfig;
use sui_types::base_types::{ObjectID, SuiAddress};
use url::Url;

/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;
const DEFAULT_EPOCHS_TO_KEEP: u64 = 30;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui indexer",
Expand Down Expand Up @@ -208,8 +215,97 @@ pub enum Command {

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
/// Path to TOML file containing configuration for retention policies.
#[arg(long)]
pub config: Option<PathBuf>,
}

/// Default retention policies and overrides for the pruner. Instantiated only if `PruningOptions`
/// is provided on indexer start. Any tables not named in the file will inherit the default
/// retention policy `epochs_to_keep`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicies {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// Mapping of retention policies for specific tables which will override the default policy
/// specified by `epochs_to_keep`.
#[serde(default)]
pub overrides: HashMap<PrunableTable, u64>,
}

impl RetentionPolicies {
pub fn new(epochs_to_keep: u64, overrides: HashMap<String, u64>) -> Self {
let overrides = overrides
.into_iter()
.filter_map(|(table_name, retention)| {
PrunableTable::from_str(&table_name)
.ok()
.map(|table| (table, retention))
})
.collect();

Self {
epochs_to_keep,
overrides,
}
}

/// Create a new `RetentionPolicies` with only the default retention specified and the default
/// override for `objects_history`.
pub fn new_with_default_retention_only(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self {
epochs_to_keep,
overrides,
}
}

/// Create a new `RetentionPolicies` with only the overrides specified and a default retention
/// of 30 days.
pub fn new_with_overrides_only(overrides: HashMap<String, u64>) -> Self {
let overrides = overrides
.into_iter()
.filter_map(|(table_name, retention)| {
PrunableTable::from_str(&table_name)
.ok()
.map(|table| (table, retention))
})
.collect();
Self {
overrides,
epochs_to_keep: DEFAULT_EPOCHS_TO_KEEP,
}
}
}

impl PruningOptions {
/// If a path to the retention policies config has been provided, attempt to parse and return
/// it. Otherwise, pruning is not enabled for the indexer, and return None.
pub fn retention_policies(self) -> Option<RetentionPolicies> {
let Some(path) = self.config else {
return None;
};

let contents =
std::fs::read_to_string(&path).expect("Failed to read retention policies file");
let policies = toml::de::from_str::<RetentionPolicies>(&contents)
.expect("Failed to parse into RetentionPolicies struct");

assert!(
policies.epochs_to_keep > 0,
"Default retention must be greater than 0"
);
assert!(
policies.overrides.values().all(|&retention| retention > 0),
"All retention overrides must be greater than 0"
);
Some(policies)
}
}

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -354,4 +450,46 @@ mod test {
// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}

#[test]
fn test_valid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;

let opts = toml::de::from_str::<RetentionPolicies>(toml_str).unwrap();
assert_eq!(opts.epochs_to_keep, 5);
assert_eq!(
opts.overrides.get(&PrunableTable::ObjectsHistory),
Some(&10)
);
assert_eq!(opts.overrides.get(&PrunableTable::Transactions), Some(&20));
assert_eq!(opts.overrides.len(), 2);
}

#[test]
fn test_invalid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
invalid_table = 30
"#;

let result = toml::from_str::<RetentionPolicies>(toml_str);
assert!(result.is_err(), "Expected an error, but parsing succeeded");

if let Err(e) = result {
assert!(
e.to_string().contains("unknown variant `invalid_table`"),
"Error message doesn't mention the invalid table"
);
}
}
}
57 changes: 56 additions & 1 deletion crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use crate::errors::IndexerError;
use clap::Args;
use diesel::migration::{Migration, MigrationSource, MigrationVersion};
use diesel::pg::Pg;
use diesel::prelude::QueryableByName;
use diesel::table;
use diesel::QueryDsl;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::collections::BTreeSet;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;
use tracing::info;

Expand Down Expand Up @@ -134,6 +135,60 @@ async fn check_db_migration_consistency_impl(
)))
}

/// Checks that the tables named in `RetentionPolicies.overrides`, if any, actually exist in the db.
pub async fn check_retention_policy_overrides_valid(
conn: &mut Connection<'_>,
overrides: &HashMap<String, u64>,
) -> Result<(), IndexerError> {
info!("Starting compatibility check");

use diesel_async::RunQueryDsl;

let select_parent_tables = r#"
SELECT c.relname AS table_name
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_partitioned_table pt ON pt.partrelid = c.oid
WHERE c.relkind IN ('r', 'p') -- 'r' for regular tables, 'p' for partitioned tables
AND n.nspname = 'public'
AND (
pt.partrelid IS NOT NULL -- This is a partitioned (parent) table
OR NOT EXISTS ( -- This is not a partition (child table)
SELECT 1
FROM pg_inherits i
WHERE i.inhrelid = c.oid
)
);
"#;

#[derive(QueryableByName)]
struct TableName {
#[diesel(sql_type = diesel::sql_types::Text)]
table_name: String,
}

let result: Vec<TableName> = diesel::sql_query(select_parent_tables)
.load(conn)
.await
.map_err(|e| IndexerError::DbMigrationError(format!("Failed to fetch tables: {e}")))?;

// Check that each key in overrides exists in result
let map_keys: HashSet<&String> = overrides.keys().collect();
let existing_tables: HashSet<_> = result.into_iter().map(|t| t.table_name).collect();

for key in map_keys {
if !existing_tables.contains(key) {
return Err(IndexerError::GenericError(format!(
"Invalid retention policy override provided for table {}: does not exist in the database",
key
)));
}
}

info!("Compatibility check passed");
Ok(())
}

pub use setup_postgres::{reset_database, run_migrations};

pub mod setup_postgres {
Expand Down
Loading

0 comments on commit 5cdcd79

Please sign in to comment.