Skip to content

Commit

Permalink
latest indexer changes
Browse files Browse the repository at this point in the history
  • Loading branch information
0xaslan committed Sep 6, 2024
1 parent 108833e commit ce8f22c
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 3 deletions.
1 change: 1 addition & 0 deletions crates/sui-deepbook-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod postgres_manager;
pub mod schema;
pub mod types;

pub mod sui_datasource;
pub mod sui_deepbook_indexer;

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-deepbook-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use sui_data_ingestion_core::DataIngestionMetrics;
use sui_deepbook_indexer::config::IndexerConfig;
use sui_deepbook_indexer::metrics::DeepBookIndexerMetrics;
use sui_deepbook_indexer::postgres_manager::get_connection_pool;
use sui_deepbook_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_deepbook_indexer::sui_deepbook_indexer::{PgDeepbookPersistent, SuiDeepBookDataMapper};
use sui_indexer_builder::indexer_builder::IndexerBuilder;
use sui_indexer_builder::sui_datasource::SuiCheckpointDatasource;
use sui_sdk::SuiClientBuilder;
use sui_types::base_types::ObjectID;
use tracing::info;
Expand Down Expand Up @@ -76,6 +76,7 @@ async fn main() -> Result<()> {
config.checkpoints_path.clone().into(),
config.deepbook_genesis_checkpoint,
ingestion_metrics.clone(),
indexer_meterics.clone(),
);
let indexer = IndexerBuilder::new(
"SuiDeepBookIndexer",
Expand Down
29 changes: 28 additions & 1 deletion crates/sui-deepbook-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use prometheus::{
register_int_counter_vec_with_registry, register_int_counter_with_registry,
register_int_gauge_vec_with_registry, IntCounter, IntCounterVec, IntGaugeVec, Registry,
};

#[derive(Clone, Debug)]
pub struct DeepBookIndexerMetrics {
pub(crate) total_deepbook_transactions: IntCounter,
pub(crate) tasks_remaining_checkpoints: IntGaugeVec,
pub(crate) tasks_processed_checkpoints: IntCounterVec,
pub(crate) live_task_current_checkpoint: IntGaugeVec,
}

impl DeepBookIndexerMetrics {
Expand All @@ -17,6 +23,27 @@ impl DeepBookIndexerMetrics {
registry,
)
.unwrap(),
tasks_remaining_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_indexer_tasks_remaining_checkpoints",
"The remaining checkpoints for each task",
&["task_name"],
registry,
)
.unwrap(),
tasks_processed_checkpoints: register_int_counter_vec_with_registry!(
"bridge_indexer_tasks_processed_checkpoints",
"Total processed checkpoints for each task",
&["task_name"],
registry,
)
.unwrap(),
live_task_current_checkpoint: register_int_gauge_vec_with_registry!(
"bridge_indexer_live_task_current_checkpoint",
"Current checkpoint of live task",
&["task_name"],
registry,
)
.unwrap(),
}
}

Expand Down
184 changes: 184 additions & 0 deletions crates/sui-deepbook-indexer/src/sui_datasource.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Error;
use async_trait::async_trait;
use mysten_metrics::{metered_channel, spawn_monitored_task};
use prometheus::IntCounterVec;
use prometheus::IntGaugeVec;
use std::path::PathBuf;
use std::sync::Arc;
use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
};
use sui_indexer_builder::indexer_builder::{DataSender, Datasource};
use sui_sdk::SuiClient;
use sui_types::base_types::TransactionDigest;
use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
use sui_types::full_checkpoint_content::CheckpointTransaction;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::sync::oneshot;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::metrics::DeepBookIndexerMetrics;

pub struct SuiCheckpointDatasource {
remote_store_url: String,
sui_client: Arc<SuiClient>,
concurrency: usize,
checkpoint_path: PathBuf,
genesis_checkpoint: u64,
metrics: DataIngestionMetrics,
indexer_metrics: DeepBookIndexerMetrics,
}
impl SuiCheckpointDatasource {
pub fn new(
remote_store_url: String,
sui_client: Arc<SuiClient>,
concurrency: usize,
checkpoint_path: PathBuf,
genesis_checkpoint: u64,
metrics: DataIngestionMetrics,
indexer_metrics: DeepBookIndexerMetrics,
) -> Self {
SuiCheckpointDatasource {
remote_store_url,
sui_client,
concurrency,
checkpoint_path,
metrics,
indexer_metrics,
genesis_checkpoint,
}
}
}

#[async_trait]
impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
data_sender: DataSender<CheckpointTxnData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let (exit_sender, exit_receiver) = oneshot::channel();
let progress_store = PerTaskInMemProgressStore {
current_checkpoint: starting_checkpoint,
exit_checkpoint: target_checkpoint,
exit_sender: Some(exit_sender),
};
let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone());
let worker = IndexerWorker::new(data_sender);
let worker_pool = WorkerPool::new(
worker,
TransactionDigest::random().to_string(),
self.concurrency,
);
executor.register(worker_pool).await?;
let checkpoint_path = self.checkpoint_path.clone();
let remote_store_url = self.remote_store_url.clone();
Ok(spawn_monitored_task!(async {
executor
.run(
checkpoint_path,
Some(remote_store_url),
vec![], // optional remote store access options
ReaderOptions::default(),
exit_receiver,
)
.await?;
Ok(())
}))
}

async fn get_live_task_starting_checkpoint(&self) -> Result<u64, Error> {
self.sui_client
.read_api()
.get_latest_checkpoint_sequence_number()
.await
.map_err(|e| anyhow::anyhow!("Failed to get last finalized block id: {:?}", e))
}

fn get_genesis_height(&self) -> u64 {
self.genesis_checkpoint
}

fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.tasks_remaining_checkpoints
}

fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

struct PerTaskInMemProgressStore {
pub current_checkpoint: u64,
pub exit_checkpoint: u64,
pub exit_sender: Option<Sender<()>>,
}

#[async_trait]
impl ProgressStore for PerTaskInMemProgressStore {
async fn load(
&mut self,
_task_name: String,
) -> Result<CheckpointSequenceNumber, anyhow::Error> {
Ok(self.current_checkpoint)
}

async fn save(
&mut self,
_task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> anyhow::Result<()> {
if checkpoint_number >= self.exit_checkpoint {
if let Some(sender) = self.exit_sender.take() {
let _ = sender.send(());
}
}
self.current_checkpoint = checkpoint_number;
Ok(())
}
}

pub struct IndexerWorker<T> {
data_sender: metered_channel::Sender<(u64, Vec<T>)>,
}

impl<T> IndexerWorker<T> {
pub fn new(data_sender: metered_channel::Sender<(u64, Vec<T>)>) -> Self {
Self { data_sender }
}
}

pub type CheckpointTxnData = (CheckpointTransaction, u64, u64);

#[async_trait]
impl Worker for IndexerWorker<CheckpointTxnData> {
async fn process_checkpoint(&self, checkpoint: SuiCheckpointData) -> anyhow::Result<()> {
tracing::trace!(
"Received checkpoint [{}] {}: {}",
checkpoint.checkpoint_summary.epoch,
checkpoint.checkpoint_summary.sequence_number,
checkpoint.transactions.len(),
);
let checkpoint_num = checkpoint.checkpoint_summary.sequence_number;
let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms;

let transactions = checkpoint
.transactions
.into_iter()
.map(|tx| (tx, checkpoint_num, timestamp_ms))
.collect();
Ok(self
.data_sender
.send((checkpoint_num, transactions))
.await?)
}
}
2 changes: 1 addition & 1 deletion crates/sui-deepbook-indexer/src/sui_deepbook_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use sui_types::transaction::{Command, TransactionDataAPI};
use tracing::info;

use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent};
use sui_indexer_builder::sui_datasource::CheckpointTxnData;
use sui_indexer_builder::Task;
use sui_types::effects::TransactionEffectsAPI;
use sui_types::event::Event;
Expand All @@ -30,6 +29,7 @@ use crate::schema::{
balances, flashloans, order_fills, order_updates, pool_prices, proposals, rebates, stakes,
sui_error_transactions, trade_params_update, votes,
};
use crate::sui_datasource::CheckpointTxnData;
use crate::{
models, schema, Balances, Flashloan, OrderFill, OrderUpdate, OrderUpdateStatus, PoolPrice,
ProcessedTxnData, Proposals, Rebates, Stakes, SuiTxnError, TradeParamsUpdate, Votes,
Expand Down

0 comments on commit ce8f22c

Please sign in to comment.