Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Need Migration] Improve query performance of get_cells in rich-indexer #4509

Merged
merged 18 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions util/rich-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ num-bigint = "0.4"
once_cell = "1.8.0"
sql-builder = "3.1"
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres"] }
include_dir = "0.7"
tempfile = "3"

[dev-dependencies]
hex = "0.4"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- 20240630_add_is_spent_to_output.sql

ALTER TABLE output
ADD COLUMN is_spent INTEGER DEFAULT 0;

UPDATE output
SET is_spent = 1
FROM input
WHERE input.output_id = output.id;
26 changes: 26 additions & 0 deletions util/rich-indexer/src/indexer/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,32 @@ pub(crate) async fn bulk_insert_tx_association_cell_dep_table(
.await
}

pub(crate) async fn spend_cell(
out_point: &OutPoint,
tx: &mut Transaction<'_, Any>,
) -> Result<bool, Error> {
let output_tx_hash = out_point.tx_hash().raw_data().to_vec();
let output_index: u32 = out_point.index().unpack();

let updated_rows = sqlx::query(
r#"
UPDATE output
SET is_spent = 1
WHERE
tx_id = (SELECT ckb_transaction.id FROM ckb_transaction WHERE tx_hash = $1)
AND output_index = $2
"#,
)
.bind(output_tx_hash)
.bind(output_index as i32)
.execute(&mut *tx)
.await
.map_err(|err| Error::DB(err.to_string()))?
.rows_affected();

Ok(updated_rows > 0)
}

pub(crate) async fn query_output_cell(
out_point: &OutPoint,
tx: &mut Transaction<'_, Any>,
Expand Down
3 changes: 3 additions & 0 deletions util/rich-indexer/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ impl AsyncRichIndexer {
if tx_index != 0 {
for (input_index, input) in tx_view.inputs().into_iter().enumerate() {
let out_point = input.previous_output();
if !spend_cell(&out_point, tx).await? {
break;
}
if self.custom_filters.is_cell_filter_enabled() {
if let Some((output_id, output, output_data)) =
query_output_cell(&out_point, tx).await?
Expand Down
25 changes: 25 additions & 0 deletions util/rich-indexer/src/indexer/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ pub(crate) async fn rollback_block(tx: &mut Transaction<'_, Any>) -> Result<(),
let tx_id_list = query_tx_id_list_by_block_id(block_id, tx).await?;
let output_lock_type_list = query_outputs_by_tx_id_list(&tx_id_list, tx).await?;

// update spent cells
reset_spent_cells(&tx_id_list, tx).await?;

// remove transactions, associations, inputs, output
remove_batch_by_blobs("ckb_transaction", "id", &tx_id_list, tx).await?;
remove_batch_by_blobs("tx_association_cell_dep", "tx_id", &tx_id_list, tx).await?;
Expand Down Expand Up @@ -80,6 +83,28 @@ async fn remove_batch_by_blobs(
Ok(())
}

async fn reset_spent_cells(tx_id_list: &[i64], tx: &mut Transaction<'_, Any>) -> Result<(), Error> {
let query = SqlBuilder::update_table("output")
.set("is_spent", 0)
.and_where_in_query(
"id",
SqlBuilder::select_from("input")
.field("output_id")
.and_where_in("consumed_tx_id", tx_id_list)
.query()
.map_err(|err| Error::DB(err.to_string()))?,
)
.sql()
.map_err(|err| Error::DB(err.to_string()))?;

sqlx::query(&query)
.execute(&mut *tx)
.await
.map_err(|err| Error::DB(err.to_string()))?;

Ok(())
}

async fn query_uncle_id_list_by_block_id(
block_id: i64,
tx: &mut Transaction<'_, Any>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ impl AsyncRichIndexerHandle {
.join(name!("script";"lock_script"))
.on("output.lock_script_id = lock_script.id"),
}
.join("input")
.on("output.id = input.output_id")
.and_where("input.output_id IS NULL"); // live cells
.and_where("output.is_spent = 0"); // live cells

// filter cells in pool
let mut dead_cells = Vec::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,25 @@ impl AsyncRichIndexerHandle {
.join("ckb_transaction")
.on("output.tx_id = ckb_transaction.id");
}
query_builder
.left()
.join("input")
.on("output.id = input.output_id");
if let Some(ref filter) = search_key.filter {
if filter.script.is_some() || filter.script_len_range.is_some() {
match search_key.script_type {
IndexerScriptType::Lock => {
query_builder
.left()
.join(name!("script";"type_script"))
.on("output.type_script_id = type_script.id");
}
IndexerScriptType::Type => {
query_builder
.left()
.join(name!("script";"lock_script"))
.on("output.lock_script_id = lock_script.id");
}
}
}
}
query_builder.and_where("input.output_id IS NULL"); // live cells
query_builder.and_where("output.is_spent = 0"); // live cells

// filter cells in pool
let mut dead_cells = Vec::new();
Expand Down
40 changes: 27 additions & 13 deletions util/rich-indexer/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use anyhow::{anyhow, Result};
use ckb_app_config::{DBDriver, RichIndexerConfig};
use futures::TryStreamExt;
use include_dir::{include_dir, Dir};
use log::LevelFilter;
use once_cell::sync::OnceCell;
use sqlx::{
any::{Any, AnyArguments, AnyConnectOptions, AnyPool, AnyPoolOptions, AnyRow},
migrate::Migrator,
query::{Query, QueryAs},
ConnectOptions, IntoArguments, Row, Transaction,
};
use tempfile::tempdir;

use std::fs::OpenOptions;
use std::fs::{self, OpenOptions};
use std::marker::{Send, Unpin};
use std::path::PathBuf;
use std::str::FromStr;
Expand All @@ -20,6 +23,7 @@ const SQL_SQLITE_CREATE_TABLE: &str = include_str!("../resources/create_sqlite_t
const SQL_SQLITE_CREATE_INDEX: &str = include_str!("../resources/create_sqlite_index.sql");
const SQL_POSTGRES_CREATE_TABLE: &str = include_str!("../resources/create_postgres_table.sql");
const SQL_POSTGRES_CREATE_INDEX: &str = include_str!("../resources/create_postgres_index.sql");
static MIGRATIONS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/resources/migrations");

#[derive(Clone, Default)]
pub struct SQLXPool {
Expand All @@ -36,21 +40,14 @@ impl Debug for SQLXPool {
}

impl SQLXPool {
pub fn new() -> Self {
SQLXPool {
pool: Arc::new(OnceCell::new()),
db_driver: DBDriver::default(),
}
}

pub async fn connect(&mut self, db_config: &RichIndexerConfig) -> Result<()> {
let pool_options = AnyPoolOptions::new()
.max_connections(10)
.min_connections(0)
.acquire_timeout(Duration::from_secs(60))
.max_lifetime(Duration::from_secs(1800))
.idle_timeout(Duration::from_secs(30));
match db_config.db_type {
let pool = match db_config.db_type {
DBDriver::Sqlite => {
let require_init = is_sqlite_require_init(db_config);
let uri = build_url_for_sqlite(db_config);
Expand All @@ -59,13 +56,13 @@ impl SQLXPool {
let pool = pool_options.connect_with(connection_options).await?;
log::info!("SQLite is connected.");
self.pool
.set(pool)
.set(pool.clone())
.map_err(|_| anyhow!("set pool failed!"))?;
if require_init {
self.create_tables_for_sqlite().await?;
}
self.db_driver = DBDriver::Sqlite;
Ok(())
pool
}
DBDriver::Postgres => {
let require_init = self.is_postgres_require_init(db_config).await?;
Expand All @@ -75,15 +72,32 @@ impl SQLXPool {
let pool = pool_options.connect_with(connection_options).await?;
log::info!("PostgreSQL is connected.");
self.pool
.set(pool)
.set(pool.clone())
.map_err(|_| anyhow!("set pool failed"))?;
if require_init {
self.create_tables_for_postgres().await?;
}
self.db_driver = DBDriver::Postgres;
Ok(())
pool
}
};

// Run migrations
log::info!("Running migrations...");
let temp_dir = tempdir()?;
for file in MIGRATIONS_DIR.files() {
log::info!("Found migration file: {:?}", file.path());
let file_path = temp_dir.path().join(file.path());
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&file_path, file.contents())?;
}
let migrator = Migrator::new(temp_dir.path()).await?;
migrator.run(&pool).await?;
eval-exec marked this conversation as resolved.
Show resolved Hide resolved
log::info!("Migrations are done.");

Ok(())
}

pub async fn fetch_count(&self, table_name: &str) -> Result<u64> {
Expand Down
Loading