diff --git a/Cargo.lock b/Cargo.lock index c3ac071e90..f95bde24b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1396,6 +1396,7 @@ dependencies = [ "ckb-types", "futures", "hex", + "include_dir", "log", "num-bigint", "once_cell", @@ -1403,6 +1404,7 @@ dependencies = [ "serde_json", "sql-builder", "sqlx", + "tempfile", "tokio", ] @@ -3162,6 +3164,25 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "include_dir" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "923d117408f1e49d914f1a379a309cffe4f18c05cf4e3d12e613a15fc81bd0dd" +dependencies = [ + "include_dir_macros", +] + +[[package]] +name = "include_dir_macros" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cab85a7ed0bd5f0e76d93846e0147172bed2e2d3f859bcc33a8d9699cad1a75" +dependencies = [ + "proc-macro2", + "quote", +] + [[package]] name = "includedir" version = "0.6.0" diff --git a/util/rich-indexer/Cargo.toml b/util/rich-indexer/Cargo.toml index ec943e1f68..5892475154 100644 --- a/util/rich-indexer/Cargo.toml +++ b/util/rich-indexer/Cargo.toml @@ -24,6 +24,8 @@ num-bigint = "0.4" once_cell = "1.8.0" sql-builder = "3.1" sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres"] } +include_dir = "0.7" +tempfile = "3" [dev-dependencies] hex = "0.4" diff --git a/util/rich-indexer/resources/migrations/20240603_add_is_spent_to_output.sql b/util/rich-indexer/resources/migrations/20240603_add_is_spent_to_output.sql new file mode 100644 index 0000000000..a7cd72d02c --- /dev/null +++ b/util/rich-indexer/resources/migrations/20240603_add_is_spent_to_output.sql @@ -0,0 +1,9 @@ +-- 20240630_add_is_spent_to_output.sql + +ALTER TABLE output +ADD COLUMN is_spent INTEGER DEFAULT 0; + +UPDATE output +SET is_spent = 1 +FROM input +WHERE input.output_id = output.id; diff --git a/util/rich-indexer/src/indexer/insert.rs b/util/rich-indexer/src/indexer/insert.rs index 9c8831b347..765732100e 100644 --- a/util/rich-indexer/src/indexer/insert.rs +++ b/util/rich-indexer/src/indexer/insert.rs @@ -400,6 +400,32 @@ pub(crate) async fn bulk_insert_tx_association_cell_dep_table( .await } +pub(crate) async fn spend_cell( + out_point: &OutPoint, + tx: &mut Transaction<'_, Any>, +) -> Result { + let output_tx_hash = out_point.tx_hash().raw_data().to_vec(); + let output_index: u32 = out_point.index().unpack(); + + let updated_rows = sqlx::query( + r#" + UPDATE output + SET is_spent = 1 + WHERE + tx_id = (SELECT ckb_transaction.id FROM ckb_transaction WHERE tx_hash = $1) + AND output_index = $2 + "#, + ) + .bind(output_tx_hash) + .bind(output_index as i32) + .execute(&mut *tx) + .await + .map_err(|err| Error::DB(err.to_string()))? + .rows_affected(); + + Ok(updated_rows > 0) +} + pub(crate) async fn query_output_cell( out_point: &OutPoint, tx: &mut Transaction<'_, Any>, diff --git a/util/rich-indexer/src/indexer/mod.rs b/util/rich-indexer/src/indexer/mod.rs index 32c395f775..a92d2bfb74 100644 --- a/util/rich-indexer/src/indexer/mod.rs +++ b/util/rich-indexer/src/indexer/mod.rs @@ -201,6 +201,9 @@ impl AsyncRichIndexer { if tx_index != 0 { for (input_index, input) in tx_view.inputs().into_iter().enumerate() { let out_point = input.previous_output(); + if !spend_cell(&out_point, tx).await? { + break; + } if self.custom_filters.is_cell_filter_enabled() { if let Some((output_id, output, output_data)) = query_output_cell(&out_point, tx).await? diff --git a/util/rich-indexer/src/indexer/remove.rs b/util/rich-indexer/src/indexer/remove.rs index 56114f4c32..7f4bf24f4c 100644 --- a/util/rich-indexer/src/indexer/remove.rs +++ b/util/rich-indexer/src/indexer/remove.rs @@ -14,6 +14,9 @@ pub(crate) async fn rollback_block(tx: &mut Transaction<'_, Any>) -> Result<(), let tx_id_list = query_tx_id_list_by_block_id(block_id, tx).await?; let output_lock_type_list = query_outputs_by_tx_id_list(&tx_id_list, tx).await?; + // update spent cells + reset_spent_cells(&tx_id_list, tx).await?; + // remove transactions, associations, inputs, output remove_batch_by_blobs("ckb_transaction", "id", &tx_id_list, tx).await?; remove_batch_by_blobs("tx_association_cell_dep", "tx_id", &tx_id_list, tx).await?; @@ -80,6 +83,28 @@ async fn remove_batch_by_blobs( Ok(()) } +async fn reset_spent_cells(tx_id_list: &[i64], tx: &mut Transaction<'_, Any>) -> Result<(), Error> { + let query = SqlBuilder::update_table("output") + .set("is_spent", 0) + .and_where_in_query( + "id", + SqlBuilder::select_from("input") + .field("output_id") + .and_where_in("consumed_tx_id", tx_id_list) + .query() + .map_err(|err| Error::DB(err.to_string()))?, + ) + .sql() + .map_err(|err| Error::DB(err.to_string()))?; + + sqlx::query(&query) + .execute(&mut *tx) + .await + .map_err(|err| Error::DB(err.to_string()))?; + + Ok(()) +} + async fn query_uncle_id_list_by_block_id( block_id: i64, tx: &mut Transaction<'_, Any>, diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs index 100a5eaca0..fdd501ecc2 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs @@ -104,9 +104,7 @@ impl AsyncRichIndexerHandle { .join(name!("script";"lock_script")) .on("output.lock_script_id = lock_script.id"), } - .join("input") - .on("output.id = input.output_id") - .and_where("input.output_id IS NULL"); // live cells + .and_where("output.is_spent = 0"); // live cells // filter cells in pool let mut dead_cells = Vec::new(); diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs index 5a14d90592..5c5aee71fe 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells_capacity.rs @@ -51,27 +51,25 @@ impl AsyncRichIndexerHandle { .join("ckb_transaction") .on("output.tx_id = ckb_transaction.id"); } - query_builder - .left() - .join("input") - .on("output.id = input.output_id"); if let Some(ref filter) = search_key.filter { if filter.script.is_some() || filter.script_len_range.is_some() { match search_key.script_type { IndexerScriptType::Lock => { query_builder + .left() .join(name!("script";"type_script")) .on("output.type_script_id = type_script.id"); } IndexerScriptType::Type => { query_builder + .left() .join(name!("script";"lock_script")) .on("output.lock_script_id = lock_script.id"); } } } } - query_builder.and_where("input.output_id IS NULL"); // live cells + query_builder.and_where("output.is_spent = 0"); // live cells // filter cells in pool let mut dead_cells = Vec::new(); diff --git a/util/rich-indexer/src/store.rs b/util/rich-indexer/src/store.rs index d9caa7f6fe..22c910ff2a 100644 --- a/util/rich-indexer/src/store.rs +++ b/util/rich-indexer/src/store.rs @@ -1,15 +1,18 @@ use anyhow::{anyhow, Result}; use ckb_app_config::{DBDriver, RichIndexerConfig}; use futures::TryStreamExt; +use include_dir::{include_dir, Dir}; use log::LevelFilter; use once_cell::sync::OnceCell; use sqlx::{ any::{Any, AnyArguments, AnyConnectOptions, AnyPool, AnyPoolOptions, AnyRow}, + migrate::Migrator, query::{Query, QueryAs}, ConnectOptions, IntoArguments, Row, Transaction, }; +use tempfile::tempdir; -use std::fs::OpenOptions; +use std::fs::{self, OpenOptions}; use std::marker::{Send, Unpin}; use std::path::PathBuf; use std::str::FromStr; @@ -20,6 +23,7 @@ const SQL_SQLITE_CREATE_TABLE: &str = include_str!("../resources/create_sqlite_t const SQL_SQLITE_CREATE_INDEX: &str = include_str!("../resources/create_sqlite_index.sql"); const SQL_POSTGRES_CREATE_TABLE: &str = include_str!("../resources/create_postgres_table.sql"); const SQL_POSTGRES_CREATE_INDEX: &str = include_str!("../resources/create_postgres_index.sql"); +static MIGRATIONS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/resources/migrations"); #[derive(Clone, Default)] pub struct SQLXPool { @@ -36,13 +40,6 @@ impl Debug for SQLXPool { } impl SQLXPool { - pub fn new() -> Self { - SQLXPool { - pool: Arc::new(OnceCell::new()), - db_driver: DBDriver::default(), - } - } - pub async fn connect(&mut self, db_config: &RichIndexerConfig) -> Result<()> { let pool_options = AnyPoolOptions::new() .max_connections(10) @@ -50,7 +47,7 @@ impl SQLXPool { .acquire_timeout(Duration::from_secs(60)) .max_lifetime(Duration::from_secs(1800)) .idle_timeout(Duration::from_secs(30)); - match db_config.db_type { + let pool = match db_config.db_type { DBDriver::Sqlite => { let require_init = is_sqlite_require_init(db_config); let uri = build_url_for_sqlite(db_config); @@ -59,13 +56,13 @@ impl SQLXPool { let pool = pool_options.connect_with(connection_options).await?; log::info!("SQLite is connected."); self.pool - .set(pool) + .set(pool.clone()) .map_err(|_| anyhow!("set pool failed!"))?; if require_init { self.create_tables_for_sqlite().await?; } self.db_driver = DBDriver::Sqlite; - Ok(()) + pool } DBDriver::Postgres => { let require_init = self.is_postgres_require_init(db_config).await?; @@ -75,15 +72,32 @@ impl SQLXPool { let pool = pool_options.connect_with(connection_options).await?; log::info!("PostgreSQL is connected."); self.pool - .set(pool) + .set(pool.clone()) .map_err(|_| anyhow!("set pool failed"))?; if require_init { self.create_tables_for_postgres().await?; } self.db_driver = DBDriver::Postgres; - Ok(()) + pool } + }; + + // Run migrations + log::info!("Running migrations..."); + let temp_dir = tempdir()?; + for file in MIGRATIONS_DIR.files() { + log::info!("Found migration file: {:?}", file.path()); + let file_path = temp_dir.path().join(file.path()); + if let Some(parent) = file_path.parent() { + fs::create_dir_all(parent)?; + } + fs::write(&file_path, file.contents())?; } + let migrator = Migrator::new(temp_dir.path()).await?; + migrator.run(&pool).await?; + log::info!("Migrations are done."); + + Ok(()) } pub async fn fetch_count(&self, table_name: &str) -> Result {