diff --git a/crates/sui-deepbook-indexer/src/lib.rs b/crates/sui-deepbook-indexer/src/lib.rs index 437140679b0e3..0ae5f0f7c1f0d 100644 --- a/crates/sui-deepbook-indexer/src/lib.rs +++ b/crates/sui-deepbook-indexer/src/lib.rs @@ -26,6 +26,7 @@ pub mod postgres_manager; pub mod schema; pub mod types; +pub mod sui_datasource; pub mod sui_deepbook_indexer; #[derive(Clone)] diff --git a/crates/sui-deepbook-indexer/src/main.rs b/crates/sui-deepbook-indexer/src/main.rs index 9c4ed67c1ac17..1885d569a7851 100644 --- a/crates/sui-deepbook-indexer/src/main.rs +++ b/crates/sui-deepbook-indexer/src/main.rs @@ -12,9 +12,9 @@ use sui_data_ingestion_core::DataIngestionMetrics; use sui_deepbook_indexer::config::IndexerConfig; use sui_deepbook_indexer::metrics::DeepBookIndexerMetrics; use sui_deepbook_indexer::postgres_manager::get_connection_pool; +use sui_deepbook_indexer::sui_datasource::SuiCheckpointDatasource; use sui_deepbook_indexer::sui_deepbook_indexer::{PgDeepbookPersistent, SuiDeepBookDataMapper}; use sui_indexer_builder::indexer_builder::IndexerBuilder; -use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource; use sui_sdk::SuiClientBuilder; use sui_types::base_types::ObjectID; use tracing::info; @@ -76,6 +76,7 @@ async fn main() -> Result<()> { config.checkpoints_path.clone().into(), config.deepbook_genesis_checkpoint, ingestion_metrics.clone(), + indexer_meterics.clone(), ); let indexer = IndexerBuilder::new( "SuiDeepBookIndexer", diff --git a/crates/sui-deepbook-indexer/src/metrics.rs b/crates/sui-deepbook-indexer/src/metrics.rs index 03bd56c14cff3..d34fb69a2fd96 100644 --- a/crates/sui-deepbook-indexer/src/metrics.rs +++ b/crates/sui-deepbook-indexer/src/metrics.rs @@ -1,11 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use prometheus::{register_int_counter_with_registry, IntCounter, Registry}; +use prometheus::{ + register_int_counter_vec_with_registry, register_int_counter_with_registry, + register_int_gauge_vec_with_registry, IntCounter, IntCounterVec, IntGaugeVec, Registry, +}; #[derive(Clone, Debug)] pub struct DeepBookIndexerMetrics { pub(crate) total_deepbook_transactions: IntCounter, + pub(crate) tasks_remaining_checkpoints: IntGaugeVec, + pub(crate) tasks_processed_checkpoints: IntCounterVec, + pub(crate) live_task_current_checkpoint: IntGaugeVec, } impl DeepBookIndexerMetrics { @@ -17,6 +23,27 @@ impl DeepBookIndexerMetrics { registry, ) .unwrap(), + tasks_remaining_checkpoints: register_int_gauge_vec_with_registry!( + "bridge_indexer_tasks_remaining_checkpoints", + "The remaining checkpoints for each task", + &["task_name"], + registry, + ) + .unwrap(), + tasks_processed_checkpoints: register_int_counter_vec_with_registry!( + "bridge_indexer_tasks_processed_checkpoints", + "Total processed checkpoints for each task", + &["task_name"], + registry, + ) + .unwrap(), + live_task_current_checkpoint: register_int_gauge_vec_with_registry!( + "bridge_indexer_live_task_current_checkpoint", + "Current checkpoint of live task", + &["task_name"], + registry, + ) + .unwrap(), } } diff --git a/crates/sui-deepbook-indexer/src/sui_datasource.rs b/crates/sui-deepbook-indexer/src/sui_datasource.rs new file mode 100644 index 0000000000000..0cae6e954cbf1 --- /dev/null +++ b/crates/sui-deepbook-indexer/src/sui_datasource.rs @@ -0,0 +1,184 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Error; +use async_trait::async_trait; +use mysten_metrics::{metered_channel, spawn_monitored_task}; +use prometheus::IntCounterVec; +use prometheus::IntGaugeVec; +use std::path::PathBuf; +use std::sync::Arc; +use sui_data_ingestion_core::{ + DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool, +}; +use sui_indexer_builder::indexer_builder::{DataSender, Datasource}; +use sui_sdk::SuiClient; +use sui_types::base_types::TransactionDigest; +use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData; +use sui_types::full_checkpoint_content::CheckpointTransaction; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; +use tokio::sync::oneshot; +use tokio::sync::oneshot::Sender; +use tokio::task::JoinHandle; + +use crate::metrics::DeepBookIndexerMetrics; + +pub struct SuiCheckpointDatasource { + remote_store_url: String, + sui_client: Arc, + concurrency: usize, + checkpoint_path: PathBuf, + genesis_checkpoint: u64, + metrics: DataIngestionMetrics, + indexer_metrics: DeepBookIndexerMetrics, +} +impl SuiCheckpointDatasource { + pub fn new( + remote_store_url: String, + sui_client: Arc, + concurrency: usize, + checkpoint_path: PathBuf, + genesis_checkpoint: u64, + metrics: DataIngestionMetrics, + indexer_metrics: DeepBookIndexerMetrics, + ) -> Self { + SuiCheckpointDatasource { + remote_store_url, + sui_client, + concurrency, + checkpoint_path, + metrics, + indexer_metrics, + genesis_checkpoint, + } + } +} + +#[async_trait] +impl Datasource for SuiCheckpointDatasource { + async fn start_data_retrieval( + &self, + starting_checkpoint: u64, + target_checkpoint: u64, + data_sender: DataSender, + ) -> Result>, Error> { + let (exit_sender, exit_receiver) = oneshot::channel(); + let progress_store = PerTaskInMemProgressStore { + current_checkpoint: starting_checkpoint, + exit_checkpoint: target_checkpoint, + exit_sender: Some(exit_sender), + }; + let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone()); + let worker = IndexerWorker::new(data_sender); + let worker_pool = WorkerPool::new( + worker, + TransactionDigest::random().to_string(), + self.concurrency, + ); + executor.register(worker_pool).await?; + let checkpoint_path = self.checkpoint_path.clone(); + let remote_store_url = self.remote_store_url.clone(); + Ok(spawn_monitored_task!(async { + executor + .run( + checkpoint_path, + Some(remote_store_url), + vec![], // optional remote store access options + ReaderOptions::default(), + exit_receiver, + ) + .await?; + Ok(()) + })) + } + + async fn get_live_task_starting_checkpoint(&self) -> Result { + self.sui_client + .read_api() + .get_latest_checkpoint_sequence_number() + .await + .map_err(|e| anyhow::anyhow!("Failed to get last finalized block id: {:?}", e)) + } + + fn get_genesis_height(&self) -> u64 { + self.genesis_checkpoint + } + + fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.tasks_remaining_checkpoints + } + + fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { + &self.indexer_metrics.tasks_processed_checkpoints + } + + fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { + &self.indexer_metrics.live_task_current_checkpoint + } +} + +struct PerTaskInMemProgressStore { + pub current_checkpoint: u64, + pub exit_checkpoint: u64, + pub exit_sender: Option>, +} + +#[async_trait] +impl ProgressStore for PerTaskInMemProgressStore { + async fn load( + &mut self, + _task_name: String, + ) -> Result { + Ok(self.current_checkpoint) + } + + async fn save( + &mut self, + _task_name: String, + checkpoint_number: CheckpointSequenceNumber, + ) -> anyhow::Result<()> { + if checkpoint_number >= self.exit_checkpoint { + if let Some(sender) = self.exit_sender.take() { + let _ = sender.send(()); + } + } + self.current_checkpoint = checkpoint_number; + Ok(()) + } +} + +pub struct IndexerWorker { + data_sender: metered_channel::Sender<(u64, Vec)>, +} + +impl IndexerWorker { + pub fn new(data_sender: metered_channel::Sender<(u64, Vec)>) -> Self { + Self { data_sender } + } +} + +pub type CheckpointTxnData = (CheckpointTransaction, u64, u64); + +#[async_trait] +impl Worker for IndexerWorker { + async fn process_checkpoint(&self, checkpoint: SuiCheckpointData) -> anyhow::Result<()> { + tracing::trace!( + "Received checkpoint [{}] {}: {}", + checkpoint.checkpoint_summary.epoch, + checkpoint.checkpoint_summary.sequence_number, + checkpoint.transactions.len(), + ); + let checkpoint_num = checkpoint.checkpoint_summary.sequence_number; + let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms; + + let transactions = checkpoint + .transactions + .into_iter() + .map(|tx| (tx, checkpoint_num, timestamp_ms)) + .collect(); + Ok(self + .data_sender + .send((checkpoint_num, transactions)) + .await?) + } +} diff --git a/crates/sui-deepbook-indexer/src/sui_deepbook_indexer.rs b/crates/sui-deepbook-indexer/src/sui_deepbook_indexer.rs index 6dcfafe910417..cade883558b68 100644 --- a/crates/sui-deepbook-indexer/src/sui_deepbook_indexer.rs +++ b/crates/sui-deepbook-indexer/src/sui_deepbook_indexer.rs @@ -11,7 +11,6 @@ use sui_types::transaction::{Command, TransactionDataAPI}; use tracing::info; use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent}; -use sui_indexer_builder::sui_datasource::CheckpointTxnData; use sui_indexer_builder::Task; use sui_types::effects::TransactionEffectsAPI; use sui_types::event::Event; @@ -30,6 +29,7 @@ use crate::schema::{ balances, flashloans, order_fills, order_updates, pool_prices, proposals, rebates, stakes, sui_error_transactions, trade_params_update, votes, }; +use crate::sui_datasource::CheckpointTxnData; use crate::{ models, schema, Balances, Flashloan, OrderFill, OrderUpdate, OrderUpdateStatus, PoolPrice, ProcessedTxnData, Proposals, Rebates, Stakes, SuiTxnError, TradeParamsUpdate, Votes,