Skip to content

Commit

Permalink
Committer pushes one update through store
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Oct 2, 2024
1 parent 5cdcd79 commit 7fdf941
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 4 deletions.
8 changes: 7 additions & 1 deletion crates/sui-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ

Start a local network using the `sui` binary:
```sh
cargo run --bin sui -- start --with-faucet --force-regenesis
cargo run --bin sui -- start --with-faucet --force-regenesis
```

If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB:
Expand Down Expand Up @@ -124,3 +124,9 @@ Note that you need an existing database for this to work. Using the DATABASE_URL
# Change the RPC_CLIENT_URL to http://0.0.0.0:9000 to run indexer against local validator & fullnode
cargo run --bin sui-indexer --features mysql-feature --no-default-features -- --db-url "<DATABASE_URL>" --rpc-client-url "https://fullnode.devnet.sui.io:443" --fullnode-sync-worker --reset-db
```

### Extending the indexer

To add a new table, run `diesel migration generate your_table_name`, and modify the newly created `up.sql` and `down.sql` files.

You would apply the migration with `diesel migration run`, and run the script in `./scripts/generate_indexer_schema.sh` to update the `schema.rs` file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS watermarks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE TABLE watermarks
(
-- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
entity TEXT NOT NULL,
-- Inclusive upper bound epoch this entity has data for. Committer updates this field. Pruner
-- uses this field for per-entity epoch-level retention, and is mostly useful for pruning
-- unpartitioned tables.
epoch_hi BIGINT NOT NULL,
-- Inclusive lower bound epoch this entity has data for. Pruner updates this field, and uses
-- this field in tandem with `epoch_hi` for per-entity epoch-level retention. This is mostly
-- useful for pruning unpartitioned tables.
epoch_lo BIGINT NOT NULL,
-- Inclusive upper bound checkpoint this entity has data for. Committer updates this field. All
-- data of this entity in the checkpoint must be persisted before advancing this watermark. The
-- committer or ingestion task refers to this on disaster recovery.
checkpoint_hi BIGINT NOT NULL,
-- Inclusive high watermark that the committer advances. For `checkpoints`, this represents the
-- checkpoint sequence number, for `transactions`, the transaction sequence number, etc.
reader_hi BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Data before this watermark is considered
-- pruned by a reader. The underlying data may still exist in the db instance.
reader_lo BIGINT NOT NULL,
-- Updated using the database's current timestamp when the pruner sees that some data needs to
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark has
-- been truly pruned from the db, and should no longer exist. When recovering from a crash, the
-- pruner will consult this column to determine where to continue.
pruned_lo BIGINT,
PRIMARY KEY (entity)
);
36 changes: 34 additions & 2 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use tracing::{error, info};

use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::handlers::pruner::PrunableTable;
use crate::metrics::IndexerMetrics;
use crate::store::IndexerStore;
use crate::types::IndexerResult;
use strum::IntoEnumIterator;

use super::{CheckpointDataToCommit, EpochToCommit};

Expand Down Expand Up @@ -57,6 +59,8 @@ where
batch.push(checkpoint);
next_checkpoint_sequence_number += 1;
let epoch_number_option = epoch.as_ref().map(|epoch| epoch.new_epoch.epoch);
// The batch will consist of contiguous checkpoints and at most one epoch boundary at
// the end.
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
commit_checkpoints(&state, batch, epoch, &metrics).await;
batch = vec![];
Expand All @@ -79,6 +83,10 @@ where
Ok(())
}

/// Writes indexed checkpoint data to the database, and then update watermark upper bounds and
/// metrics. Expects `indexed_checkpoint_batch` to be non-empty, and contain contiguous checkpoints.
/// There can be at most one epoch boundary at the end. If an epoch boundary is detected,
/// epoch-partitioned tables must be advanced.
// Unwrap: Caller needs to make sure indexed_checkpoint_batch is not empty
#[instrument(skip_all, fields(
first = indexed_checkpoint_batch.first().as_ref().unwrap().checkpoint.sequence_number,
Expand Down Expand Up @@ -130,7 +138,14 @@ async fn commit_checkpoints<S>(
}

let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number;
let (epoch_id, last_checkpoint_seq, last_tx_seq) = {
let checkpoint = checkpoint_batch.last().unwrap();
(
checkpoint.epoch,
checkpoint.sequence_number,
checkpoint.max_tx_sequence_number,
)
};

let guard = metrics.checkpoint_db_commit_latency.start_timer();
let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
Expand Down Expand Up @@ -189,7 +204,8 @@ async fn commit_checkpoints<S>(

let is_epoch_end = epoch.is_some();

// handle partitioning on epoch boundary
// On epoch boundary, we need to modify the existing partitions' upper bound, and introduce a
// new partition for incoming data for the upcoming epoch.
if let Some(epoch_data) = epoch {
state
.advance_epoch(epoch_data)
Expand All @@ -212,6 +228,22 @@ async fn commit_checkpoints<S>(
})
.expect("Persisting data into DB should not fail.");

state
.update_watermarks_upper_bound(
PrunableTable::iter().collect(),
epoch_id,
last_checkpoint_seq,
last_tx_seq,
)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

if is_epoch_end {
// The epoch has advanced so we update the configs for the new protocol version, if it has changed.
let chain_id = state
Expand Down
32 changes: 32 additions & 0 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,38 @@ pub enum PrunableTable {
PrunerCpWatermark,
}

impl PrunableTable {
/// Given a committer's report of the latest written checkpoint and tx, return the value that
/// corresponds to the variant's unit to be used by readers.
pub fn map_to_reader_unit(&self, cp: u64, tx: u64) -> u64 {
match self {
PrunableTable::ObjectsHistory => cp,
PrunableTable::Checkpoints => cp,
PrunableTable::PrunerCpWatermark => cp,
PrunableTable::Events => tx,
PrunableTable::EventEmitPackage => tx,
PrunableTable::EventEmitModule => tx,
PrunableTable::EventSenders => tx,
PrunableTable::EventStructInstantiation => tx,
PrunableTable::EventStructModule => tx,
PrunableTable::EventStructName => tx,
PrunableTable::EventStructPackage => tx,
PrunableTable::Transactions => tx,
PrunableTable::TxAffectedAddresses => tx,
PrunableTable::TxAffectedObjects => tx,
PrunableTable::TxCallsPkg => tx,
PrunableTable::TxCallsMod => tx,
PrunableTable::TxCallsFun => tx,
PrunableTable::TxChangedObjects => tx,
PrunableTable::TxDigests => tx,
PrunableTable::TxInputObjects => tx,
PrunableTable::TxKinds => tx,
PrunableTable::TxRecipients => tx,
PrunableTable::TxSenders => tx,
}
}
}

impl Pruner {
/// Instantiates a pruner with default retention and overrides. Pruner will merge these into a
/// list of policies for each prunable table.
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ impl Indexer {

let mut exit_senders = vec![];
let mut executors = vec![];
// Ingestion task watermarks are snapshotted once on indexer startup based on the
// corresponding watermark table before being handed off to the ingestion task.
let progress_store = ShimIndexerProgressStore::new(vec![
("primary".to_string(), primary_watermark),
("object_snapshot".to_string(), object_snapshot_watermark),
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub struct IndexerMetrics {
pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram,
pub checkpoint_db_commit_latency_checkpoints: Histogram,
pub checkpoint_db_commit_latency_epoch: Histogram,
pub checkpoint_db_commit_latency_watermarks: Histogram,
pub thousand_transaction_avg_db_commit_latency: Histogram,
pub object_db_commit_latency: Histogram,
pub object_mutation_db_commit_latency: Histogram,
Expand Down Expand Up @@ -536,6 +537,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
checkpoint_db_commit_latency_watermarks: register_histogram_with_registry!(
"checkpoint_db_commit_latency_watermarks",
"Time spent committing watermarks",
DATA_INGESTION_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
tokio_blocking_task_wait_latency: register_histogram_with_registry!(
"tokio_blocking_task_wait_latency",
"Time spent to wait for tokio blocking task pool",
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ pub mod packages;
pub mod raw_checkpoints;
pub mod transactions;
pub mod tx_indices;
pub mod watermarks;
49 changes: 49 additions & 0 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::watermarks::{self};
use diesel::prelude::*;

/// Represents a row in the `watermarks` table.
#[derive(Queryable, Insertable, Default, QueryableByName)]
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
/// The table name of group of tables governed by this watermark, i.e `epochs`, `checkpoints`,
/// `transactions`.
pub entity: String,
/// Upper bound epoch range to enable per-entity epoch-level retention policy. Committer
/// advances this along with `high`.
pub epoch_hi: i64,
/// Lower bound epoch range to enable per-entity epoch-level retention policy. Pruner advances
/// this.
pub epoch_lo: i64,
pub checkpoint_hi: i64,
/// The inclusive high watermark that the committer advances.
pub reader_hi: i64,
/// The inclusive low watermark that the pruner advances. Data before this watermark is
/// considered pruned.
pub reader_lo: i64,
/// Pruner sets this, and uses this column to determine whether to prune or wait long enough
/// that all in-flight reads complete or timeout before it acts on an updated watermark.
pub timestamp_ms: i64,
/// Pruner updates this, and uses this when recovering from a crash to determine where to
/// continue pruning. Represents the latest watermark pruned, inclusive.
pub pruned_lo: Option<i64>,
}

impl StoredWatermark {
pub fn from_upper_bound_update(
entity: &str,
epoch_hi: u64,
checkpoint_hi: u64,
reader_hi: u64,
) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_hi: epoch_hi as i64,
checkpoint_hi: checkpoint_hi as i64,
reader_hi: reader_hi as i64,
..StoredWatermark::default()
}
}
}
14 changes: 14 additions & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ diesel::table! {
}
}

diesel::table! {
watermarks (entity) {
entity -> Text,
epoch_hi -> Int8,
epoch_lo -> Int8,
checkpoint_hi -> Int8,
reader_hi -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruned_lo -> Nullable<Int8>,
}
}

diesel::allow_tables_to_appear_in_same_query!(
chain_identifier,
checkpoints,
Expand Down Expand Up @@ -403,4 +416,5 @@ diesel::allow_tables_to_appear_in_same_query!(
tx_kinds,
tx_recipients,
tx_senders,
watermarks,
);
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::BTreeMap;
use async_trait::async_trait;

use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
Expand Down Expand Up @@ -114,4 +115,12 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
&self,
checkpoints: Vec<StoredRawCheckpoint>,
) -> Result<(), IndexerError>;

async fn update_watermarks_upper_bound(
&self,
tables: Vec<PrunableTable>,
epoch: u64,
cp: u64,
tx: u64,
) -> Result<(), IndexerError>;
}
Loading

0 comments on commit 7fdf941

Please sign in to comment.