Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer][watermarks][1/n] Modify PruningOptions to point to a toml file of epochs_to_keep and optional per-table overrides #19637

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_indexer::config::PruningOptions;
pub use sui_indexer::config::RetentionConfig;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
Expand Down Expand Up @@ -151,7 +151,7 @@ pub async fn start_network_cluster() -> NetworkCluster {
pub async fn serve_executor(
executor: Arc<dyn RestStateReader + Send + Sync>,
snapshot_config: Option<SnapshotLagConfig>,
epochs_to_keep: Option<u64>,
retention_config: Option<RetentionConfig>,
data_ingestion_path: PathBuf,
) -> ExecutorCluster {
let database = TempDb::new().unwrap();
Expand Down Expand Up @@ -184,7 +184,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
Some(snapshot_config.clone()),
Some(PruningOptions { epochs_to_keep }),
retention_config,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
9 changes: 5 additions & 4 deletions crates/sui-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ

Start a local network using the `sui` binary:
```sh
cargo run --bin sui -- start --with-faucet --force-regenesis
cargo run --bin sui -- start --with-faucet --force-regenesis
```

If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB:
Expand Down Expand Up @@ -65,11 +65,12 @@ cargo run --bin sui-indexer -- --db-url "<DATABASE_URL>" --rpc-client-url "https
```
cargo run --bin sui-indexer -- --db-url "<DATABASE_URL>" --rpc-client-url "https://fullnode.devnet.sui.io:443" --rpc-server-worker
```
More flags info can be found in this [file](https://github.com/MystenLabs/sui/blob/main/crates/sui-indexer/src/lib.rs#L83-L123).
More flags info can be found in this [file](src/main.rs#L41).

### DB reset
Run this command under `sui/crates/sui-indexer`, which will wipe DB; In case of schema changes in `.sql` files, this will also update corresponding `schema.rs` file.
When making db-related changes, you may find yourself having to run migrations and reset dbs often. The commands below are how you can invoke these actions.
```sh
diesel database reset --database-url="<DATABASE_URL>"
cargo run --bin sui-indexer -- --database-url "<DATABASE_URL>" reset-database --force
```

## Steps to run locally (TiDB)
Expand Down
226 changes: 222 additions & 4 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::BackfillTaskKind;
use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use std::{net::SocketAddr, path::PathBuf};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use strum::IntoEnumIterator;
use sui_json_rpc::name_service::NameServiceConfig;
use sui_types::base_types::{ObjectID, SuiAddress};
use url::Url;

/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;
wlmyng marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui indexer",
Expand Down Expand Up @@ -208,8 +214,93 @@ pub enum Command {

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
/// Path to TOML file containing configuration for retention policies.
#[arg(long)]
pub pruning_config_path: Option<PathBuf>,
}

/// Represents the default retention policy and overrides for prunable tables. Instantiated only if
/// `PruningOptions` is provided on indexer start.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionConfig {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// A map of tables to their respective retention policies that will override the default.
/// Prunable tables not named here will use the default retention policy.
#[serde(default)]
pub overrides: HashMap<PrunableTable, u64>,
}

impl PruningOptions {
/// Load default retention policy and overrides from file.
pub fn load_from_file(&self) -> Option<RetentionConfig> {
let config_path = self.pruning_config_path.as_ref()?;

let contents = std::fs::read_to_string(config_path)
.expect("Failed to read default retention policy and overrides from file");
let retention_with_overrides = toml::de::from_str::<RetentionConfig>(&contents)
.expect("Failed to parse into RetentionConfig struct");

let default_retention = retention_with_overrides.epochs_to_keep;

assert!(
default_retention > 0,
"Default retention must be greater than 0"
);
assert!(
retention_with_overrides
.overrides
.values()
.all(|&policy| policy > 0),
"All retention overrides must be greater than 0"
);

Some(retention_with_overrides)
}
}

impl RetentionConfig {
/// Create a new `RetentionConfig` with the specified default retention and overrides. Call
/// `finalize()` on the instance to update the `policies` field with the default retention
/// policy for all tables that do not have an override specified.
pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
Self {
epochs_to_keep,
overrides,
}
}

pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self::new(epochs_to_keep, HashMap::new())
}

/// Consumes this struct to produce a full mapping of every prunable table and its retention
/// policy. By default, every prunable table will have the default retention policy from
/// `epochs_to_keep`. Some tables like `objects_history` will observe a different default
/// retention policy. These default values are overridden by any entries in `overrides`.
pub fn retention_policies(self) -> HashMap<PrunableTable, u64> {
let RetentionConfig {
epochs_to_keep,
mut overrides,
} = self;

for table in PrunableTable::iter() {
let default_retention = match table {
PrunableTable::ObjectsHistory => OBJECTS_HISTORY_EPOCHS_TO_KEEP,
_ => epochs_to_keep,
};

overrides.entry(table).or_insert(default_retention);
}

overrides
}
}

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -290,7 +381,9 @@ impl Default for RestoreConfig {
#[cfg(test)]
mod test {
use super::*;
use std::io::Write;
use tap::Pipe;
use tempfile::NamedTempFile;

fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
where
Expand Down Expand Up @@ -354,4 +447,129 @@ mod test {
// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}

#[test]
fn pruning_options_with_objects_history_override() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let retention_config = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_config.epochs_to_keep, 5);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::ObjectsHistory)
.copied(),
Some(10)
);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::Transactions)
.copied(),
Some(20)
);
assert_eq!(retention_config.overrides.len(), 2);

let retention_policies = retention_config.retention_policies();

for table in PrunableTable::iter() {
let Some(retention) = retention_policies.get(&table).copied() else {
panic!("Expected a retention policy for table {:?}", table);
};

match table {
PrunableTable::ObjectsHistory => assert_eq!(retention, 10),
PrunableTable::Transactions => assert_eq!(retention, 20),
_ => assert_eq!(retention, 5),
};
}
}

#[test]
fn pruning_options_no_objects_history_override() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
tx_affected_addresses = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let retention_config = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_config.epochs_to_keep, 5);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::TxAffectedAddresses)
.copied(),
Some(10)
);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::Transactions)
.copied(),
Some(20)
);
assert_eq!(retention_config.overrides.len(), 2);

let retention_policies = retention_config.retention_policies();

wlmyng marked this conversation as resolved.
Show resolved Hide resolved
for table in PrunableTable::iter() {
let Some(retention) = retention_policies.get(&table).copied() else {
panic!("Expected a retention policy for table {:?}", table);
};

match table {
PrunableTable::ObjectsHistory => {
assert_eq!(retention, OBJECTS_HISTORY_EPOCHS_TO_KEEP)
}
PrunableTable::TxAffectedAddresses => assert_eq!(retention, 10),
PrunableTable::Transactions => assert_eq!(retention, 20),
_ => assert_eq!(retention, 5),
};
}
}

#[test]
fn test_invalid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
invalid_table = 30
"#;

let result = toml::from_str::<RetentionConfig>(toml_str);
assert!(result.is_err(), "Expected an error, but parsing succeeded");

if let Err(e) = result {
assert!(
e.to_string().contains("unknown variant `invalid_table`"),
"Error message doesn't mention the invalid table"
);
}
}
}
Loading
Loading