Skip to content

Commit

Permalink
[Indexer] Add raw_checkpoints table (#19451)
Browse files Browse the repository at this point in the history
## Description 

This PR adds a raw_checkpoints table that store the raw BCS data for
each checkpoint.
This will serve as the KV table for checkpoints.

## Test plan 

CI.
Once I add some code for the reads there will be some test coverage.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Sep 20, 2024
1 parent 06b1e4d commit 35dacad
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE raw_checkpoints;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE raw_checkpoints
(
sequence_number BIGINT PRIMARY KEY,
certified_checkpoint BYTEA NOT NULL,
checkpoint_contents BYTEA NOT NULL
);
5 changes: 5 additions & 0 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ async fn commit_checkpoints<S>(
let packages_batch = packages_batch.into_iter().flatten().collect::<Vec<_>>();
let checkpoint_num = checkpoint_batch.len();
let tx_count = tx_batch.len();
let raw_checkpoints_batch = checkpoint_batch
.iter()
.map(|c| c.into())
.collect::<Vec<_>>();

{
let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
Expand All @@ -168,6 +172,7 @@ async fn commit_checkpoints<S>(
state.persist_object_history(object_history_changes_batch.clone()),
state.persist_full_objects_history(object_history_changes_batch.clone()),
state.persist_object_versions(object_versions_batch.clone()),
state.persist_raw_checkpoints(raw_checkpoints_batch),
];
if let Some(epoch_data) = epoch.clone() {
persist_tasks.push(state.persist_epoch(epoch_data));
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ pub mod events;
pub mod obj_indices;
pub mod objects;
pub mod packages;
pub mod raw_checkpoints;
pub mod transactions;
pub mod tx_indices;
26 changes: 26 additions & 0 deletions crates/sui-indexer/src/models/raw_checkpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::raw_checkpoints;
use crate::types::IndexedCheckpoint;
use diesel::prelude::*;

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = raw_checkpoints)]
pub struct StoredRawCheckpoint {
pub sequence_number: i64,
/// BCS serialized CertifiedCheckpointSummary
pub certified_checkpoint: Vec<u8>,
/// BCS serialized CheckpointContents
pub checkpoint_contents: Vec<u8>,
}

impl From<&IndexedCheckpoint> for StoredRawCheckpoint {
fn from(c: &IndexedCheckpoint) -> Self {
Self {
sequence_number: c.sequence_number as i64,
certified_checkpoint: bcs::to_bytes(c.certified_checkpoint.as_ref().unwrap()).unwrap(),
checkpoint_contents: bcs::to_bytes(c.checkpoint_contents.as_ref().unwrap()).unwrap(),
}
}
}
4 changes: 3 additions & 1 deletion crates/sui-indexer/src/restorer/formal_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl IndexerFormalSnapshotRestorer {
match object {
LiveObject::Normal(obj) => {
// TODO: placeholder values for df_info and checkpoint_seq_num,
// will clean it up when the columne cleanup is done.
// will clean it up when the column cleanup is done.
let indexed_object =
IndexedObject::from_object(0, obj, None);
move_objects.push(indexed_object);
Expand Down Expand Up @@ -274,6 +274,8 @@ impl IndexerFormalSnapshotRestorer {
.persist_chain_identifier(chain_identifier.into_inner().to_vec())
.await?;
assert!(next_checkpoint_after_epoch > 0);
// FIXME: This is a temporary hack to add a checkpoint watermark.
// Once we have proper watermark tables, we should remove the following code.
let last_cp = IndexedCheckpoint {
sequence_number: next_checkpoint_after_epoch - 1,
..Default::default()
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ diesel::table! {
}
}

diesel::table! {
raw_checkpoints (sequence_number) {
sequence_number -> Int8,
certified_checkpoint -> Bytea,
checkpoint_contents -> Bytea,
}
}

diesel::table! {
transactions (tx_sequence_number) {
tx_sequence_number -> Int8,
Expand Down Expand Up @@ -381,6 +389,7 @@ diesel::allow_tables_to_appear_in_same_query!(
packages,
protocol_configs,
pruner_cp_watermark,
raw_checkpoints,
transactions,
tx_affected_addresses,
tx_affected_objects,
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{StoredDeletedObject, StoredObject};
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::types::{
EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex,
};
Expand Down Expand Up @@ -106,5 +107,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
) -> Result<u64, IndexerError>;

async fn upload_display(&self, epoch: u64) -> Result<(), IndexerError>;

async fn restore_display(&self, bytes: bytes::Bytes) -> Result<(), IndexerError>;

async fn persist_raw_checkpoints(
&self,
checkpoints: Vec<StoredRawCheckpoint>,
) -> Result<(), IndexerError>;
}
36 changes: 33 additions & 3 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ use crate::schema::{
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun, tx_calls_mod,
tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients,
tx_senders,
raw_checkpoints, transactions, tx_affected_addresses, tx_affected_objects, tx_calls_fun,
tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
tx_recipients, tx_senders,
};
use crate::store::transaction_with_retry;
use crate::types::EventIndex;
Expand All @@ -60,6 +60,7 @@ use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTrans
use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager};
use super::{IndexerStore, ObjectsToCommit};

use crate::models::raw_checkpoints::StoredRawCheckpoint;
use diesel::upsert::excluded;
use sui_types::digests::{ChainIdentifier, CheckpointDigest};

Expand Down Expand Up @@ -581,6 +582,28 @@ impl PgIndexerStore {
.await
}

async fn persist_raw_checkpoints_impl(
&self,
raw_checkpoints: &[StoredRawCheckpoint],
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(raw_checkpoints::table)
.values(raw_checkpoints)
.on_conflict_do_nothing()
.execute(conn)
.await
.map_err(IndexerError::from)
.context("Failed to write to raw_checkpoints table")?;
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await
}

async fn persist_checkpoints(
&self,
checkpoints: Vec<IndexedCheckpoint>,
Expand Down Expand Up @@ -2097,6 +2120,13 @@ impl IndexerStore for PgIndexerStore {
.await?;
Ok(())
}

async fn persist_raw_checkpoints(
&self,
checkpoints: Vec<StoredRawCheckpoint>,
) -> Result<(), IndexerError> {
self.persist_raw_checkpoints_impl(&checkpoints).await
}
}

fn make_objects_to_commit(
Expand Down
14 changes: 10 additions & 4 deletions crates/sui-indexer/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use sui_types::dynamic_field::DynamicFieldInfo;
use sui_types::effects::TransactionEffects;
use sui_types::event::SystemEpochInfoEvent;
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointCommitment, CheckpointDigest, CheckpointSequenceNumber,
EndOfEpochData,
CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointDigest,
CheckpointSequenceNumber, EndOfEpochData,
};
use sui_types::move_package::MovePackage;
use sui_types::object::{Object, Owner};
Expand All @@ -29,6 +29,7 @@ pub type IndexerResult<T> = Result<T, IndexerError>;

#[derive(Debug, Default)]
pub struct IndexedCheckpoint {
// TODO: A lot of fields are now redundant with certified_checkpoint and checkpoint_contents.
pub sequence_number: u64,
pub checkpoint_digest: CheckpointDigest,
pub epoch: u64,
Expand All @@ -48,12 +49,15 @@ pub struct IndexedCheckpoint {
pub end_of_epoch: bool,
pub min_tx_sequence_number: u64,
pub max_tx_sequence_number: u64,
// FIXME: Remove the Default derive and make these fields mandatory.
pub certified_checkpoint: Option<CertifiedCheckpointSummary>,
pub checkpoint_contents: Option<CheckpointContents>,
}

impl IndexedCheckpoint {
pub fn from_sui_checkpoint(
checkpoint: &sui_types::messages_checkpoint::CertifiedCheckpointSummary,
contents: &sui_types::messages_checkpoint::CheckpointContents,
checkpoint: &CertifiedCheckpointSummary,
contents: &CheckpointContents,
successful_tx_num: usize,
) -> Self {
let total_gas_cost = checkpoint.epoch_rolling_gas_cost_summary.computation_cost as i64
Expand Down Expand Up @@ -86,6 +90,8 @@ impl IndexedCheckpoint {
checkpoint_commitments: checkpoint.checkpoint_commitments.clone(),
min_tx_sequence_number,
max_tx_sequence_number,
certified_checkpoint: Some(checkpoint.clone()),
checkpoint_contents: Some(contents.clone()),
}
}
}
Expand Down

0 comments on commit 35dacad

Please sign in to comment.