From d9e81dca1316427726e56f567c248bd2d3dd93fb Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Tue, 6 Aug 2024 17:56:37 +0800 Subject: [PATCH 1/9] support run query with timeout --- Cargo.lock | 1 + crates/rooch-indexer/Cargo.toml | 1 + crates/rooch-indexer/src/errors.rs | 3 + crates/rooch-indexer/src/indexer_reader.rs | 64 +++++++++++++++++-- .../src/tests/test_concurrence.rs | 4 +- .../rooch-indexer/src/tests/test_indexer.rs | 20 +++--- 6 files changed, 74 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f9aada6ea..de0c81bfad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9344,6 +9344,7 @@ dependencies = [ "diesel", "diesel_migrations", "function_name", + "futures", "log", "metrics", "move-core-types", diff --git a/crates/rooch-indexer/Cargo.toml b/crates/rooch-indexer/Cargo.toml index 5c5680c010..d5de107a67 100644 --- a/crates/rooch-indexer/Cargo.toml +++ b/crates/rooch-indexer/Cargo.toml @@ -30,6 +30,7 @@ log = { workspace = true } tap = { workspace = true } prometheus = { workspace = true } function_name = { workspace = true } +futures = { workspace = true } move-core-types = { workspace = true } diff --git a/crates/rooch-indexer/src/errors.rs b/crates/rooch-indexer/src/errors.rs index 53368c3700..15d5ddda17 100644 --- a/crates/rooch-indexer/src/errors.rs +++ b/crates/rooch-indexer/src/errors.rs @@ -28,6 +28,9 @@ pub enum IndexerError { #[error("Indexer failed to read SQLiteDB with error: `{0}`")] SQLiteReadError(String), + #[error("Indexer async read SQLiteDB error: `{0}`")] + SQLiteAsyncReadError(String), + #[error("Indexer failed to reset SQLiteDB with error: `{0}`")] SQLiteResetError(String), diff --git a/crates/rooch-indexer/src/indexer_reader.rs b/crates/rooch-indexer/src/indexer_reader.rs index 46ee8309af..50f9bf605d 100644 --- a/crates/rooch-indexer/src/indexer_reader.rs +++ b/crates/rooch-indexer/src/indexer_reader.rs @@ -29,6 +29,10 @@ use std::collections::HashMap; use std::ops::DerefMut; use std::path::PathBuf; use std::sync::{Arc, RwLock}; +use std::time::Duration; +use tokio::time::timeout; + +pub const DEFAULT_QUERY_TIMEOUT: u64 = 32; // second pub const TX_ORDER_STR: &str = "tx_order"; pub const TX_HASH_STR: &str = "tx_hash"; @@ -98,6 +102,46 @@ impl InnerIndexerReader { .transaction(query) .map_err(|e| IndexerError::SQLiteReadError(e.to_string())) } + + pub fn run_query_with_timeout(&self, query: F) -> Result + where + F: FnOnce(&mut SqliteConnection) -> Result + Send + 'static, + E: From + std::error::Error + Send, + T: Send + 'static, + { + let timeout_duration = Duration::from_secs(DEFAULT_QUERY_TIMEOUT); // default 30 second timeout + let mut connection = self.get_connection()?; + + futures::executor::block_on(async move { + timeout( + timeout_duration, + tokio::task::spawn_blocking(move || { + connection + .deref_mut() + .transaction(query) + .map_err(|e| IndexerError::SQLiteReadError(e.to_string())) + }), + ) + .await + .map_err(|e| IndexerError::SQLiteAsyncReadError(e.to_string()))?? + }) + + // tokio::task::block_in_place(|| { + // Handle::current().block_on(async move { + // timeout( + // timeout_duration, + // tokio::task::spawn_blocking(move || { + // connection + // .deref_mut() + // .transaction(query) + // .map_err(|e| IndexerError::SQLiteReadError(e.to_string())) + // }), + // ) + // .await + // .map_err(|e| IndexerError::SQLiteAsyncReadError(e.to_string()))?? + // }) + // }) + } } #[derive(Clone)] @@ -165,7 +209,7 @@ impl IndexerReader { } else if descending_order { let max_tx_order: i64 = self .get_inner_indexer_reader(INDEXER_TRANSACTIONS_TABLE_NAME)? - .run_query(|conn| { + .run_query_with_timeout(|conn| { transactions::dsl::transactions .select(transactions::tx_order) .order_by(transactions::tx_order.desc()) @@ -235,7 +279,9 @@ impl IndexerReader { tracing::debug!("query transactions: {}", query); let stored_transactions = self .get_inner_indexer_reader(INDEXER_TRANSACTIONS_TABLE_NAME)? - .run_query(|conn| diesel::sql_query(query).load::(conn))?; + .run_query_with_timeout(|conn| { + diesel::sql_query(query).load::(conn) + })?; let result = stored_transactions .into_iter() @@ -271,7 +317,7 @@ impl IndexerReader { } else if descending_order { let (max_tx_order, event_index): (i64, i64) = self .get_inner_indexer_reader(INDEXER_EVENTS_TABLE_NAME)? - .run_query(|conn| { + .run_query_with_timeout(|conn| { events::dsl::events .select((events::tx_order, events::event_index)) .order_by((events::tx_order.desc(), events::event_index.desc())) @@ -344,7 +390,7 @@ impl IndexerReader { tracing::debug!("query events: {}", query); let stored_events = self .get_inner_indexer_reader(INDEXER_EVENTS_TABLE_NAME)? - .run_query(|conn| diesel::sql_query(query).load::(conn))?; + .run_query_with_timeout(|conn| diesel::sql_query(query).load::(conn))?; let result = stored_events .into_iter() @@ -373,7 +419,7 @@ impl IndexerReader { } else if descending_order { let (max_tx_order, state_index): (i64, i64) = self .get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)? - .run_query(|conn| { + .run_query_with_timeout(|conn| { object_states::dsl::object_states .select((object_states::tx_order, object_states::state_index)) .order_by(( @@ -449,7 +495,9 @@ impl IndexerReader { tracing::debug!("query object states: {}", query); let stored_object_states = self .get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)? - .run_query(|conn| diesel::sql_query(query).load::(conn))?; + .run_query_with_timeout(|conn| { + diesel::sql_query(query).load::(conn) + })?; Ok(stored_object_states) } @@ -516,7 +564,9 @@ impl IndexerReader { tracing::debug!("query last state index by tx order: {}", query); let stored_object_states = self .get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)? - .run_query(|conn| diesel::sql_query(query).load::(conn))?; + .run_query_with_timeout(|conn| { + diesel::sql_query(query).load::(conn) + })?; let last_state_index = if stored_object_states.is_empty() { 0 } else { diff --git a/crates/rooch-indexer/src/tests/test_concurrence.rs b/crates/rooch-indexer/src/tests/test_concurrence.rs index 8648f91220..42c8d33f3a 100644 --- a/crates/rooch-indexer/src/tests/test_concurrence.rs +++ b/crates/rooch-indexer/src/tests/test_concurrence.rs @@ -20,8 +20,8 @@ mod tests { use std::thread; use std::time::Duration; - #[test] - fn test_sqlite_writer_sqlite_reader_concurrence() { + #[tokio::test] + async fn test_sqlite_writer_sqlite_reader_concurrence() { let count = sqlite_writer_sqlite_reader_concurrence().unwrap(); assert_eq!(count, get_row_count()); } diff --git a/crates/rooch-indexer/src/tests/test_indexer.rs b/crates/rooch-indexer/src/tests/test_indexer.rs index 4ba0c6a977..800961e5f9 100644 --- a/crates/rooch-indexer/src/tests/test_indexer.rs +++ b/crates/rooch-indexer/src/tests/test_indexer.rs @@ -70,8 +70,8 @@ fn random_remove_object_states() -> Vec { remove_object_states } -#[test] -fn test_transaction_store() -> Result<()> { +#[tokio::test] +async fn test_transaction_store() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); let indexer_db = tmpdir.path().join(DEFAULT_DB_INDEXER_SUBDIR); @@ -114,8 +114,8 @@ fn test_transaction_store() -> Result<()> { Ok(()) } -#[test] -fn test_event_store() -> Result<()> { +#[tokio::test] +async fn test_event_store() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); let indexer_db = tmpdir.path().join(DEFAULT_DB_INDEXER_SUBDIR); @@ -148,8 +148,8 @@ fn test_event_store() -> Result<()> { Ok(()) } -#[test] -fn test_state_store() -> Result<()> { +#[tokio::test] +async fn test_state_store() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); let indexer_db = tmpdir.path().join(DEFAULT_DB_INDEXER_SUBDIR); @@ -180,8 +180,8 @@ fn test_state_store() -> Result<()> { Ok(()) } -#[test] -fn test_object_type_query() -> Result<()> { +#[tokio::test] +async fn test_object_type_query() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); let indexer_db = tmpdir.path().join(DEFAULT_DB_INDEXER_SUBDIR); @@ -231,8 +231,8 @@ fn test_object_type_query() -> Result<()> { Ok(()) } -#[test] -fn test_escape_transaction() -> Result<()> { +#[tokio::test] +async fn test_escape_transaction() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); let indexer_db = tmpdir.path().join(DEFAULT_DB_INDEXER_SUBDIR); From f8e08528b2845ce00fbcd72468d0e4b09d217e29 Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Sun, 11 Aug 2024 21:06:47 +0800 Subject: [PATCH 2/9] resolve indexer test case by specify tokio test as multi_thread configuration --- crates/rooch-indexer/src/indexer_reader.rs | 48 +++++++------------ .../rooch-indexer/src/tests/test_indexer.rs | 10 ++-- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/crates/rooch-indexer/src/indexer_reader.rs b/crates/rooch-indexer/src/indexer_reader.rs index 8d3c1f369a..477b41b97b 100644 --- a/crates/rooch-indexer/src/indexer_reader.rs +++ b/crates/rooch-indexer/src/indexer_reader.rs @@ -31,9 +31,10 @@ use std::ops::DerefMut; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::time::Duration; +use tokio::runtime::Handle; use tokio::time::timeout; -pub const DEFAULT_QUERY_TIMEOUT: u64 = 32; // second +pub const DEFAULT_QUERY_TIMEOUT: u64 = 60; // second pub const TX_ORDER_STR: &str = "tx_order"; pub const TX_HASH_STR: &str = "tx_hash"; @@ -110,38 +111,25 @@ impl InnerIndexerReader { E: From + std::error::Error + Send, T: Send + 'static, { - let timeout_duration = Duration::from_secs(DEFAULT_QUERY_TIMEOUT); // default 30 second timeout + // default query time out in second + let timeout_duration = Duration::from_secs(DEFAULT_QUERY_TIMEOUT); let mut connection = self.get_connection()?; - futures::executor::block_on(async move { - timeout( - timeout_duration, - tokio::task::spawn_blocking(move || { - connection - .deref_mut() - .transaction(query) - .map_err(|e| IndexerError::SQLiteReadError(e.to_string())) - }), - ) - .await - .map_err(|e| IndexerError::SQLiteAsyncReadError(e.to_string()))?? + tokio::task::block_in_place(|| { + Handle::current().block_on(async move { + timeout( + timeout_duration, + tokio::task::spawn_blocking(move || { + connection + .deref_mut() + .transaction(query) + .map_err(|e| IndexerError::SQLiteReadError(e.to_string())) + }), + ) + .await + .map_err(|e| IndexerError::SQLiteAsyncReadError(e.to_string()))?? + }) }) - - // tokio::task::block_in_place(|| { - // Handle::current().block_on(async move { - // timeout( - // timeout_duration, - // tokio::task::spawn_blocking(move || { - // connection - // .deref_mut() - // .transaction(query) - // .map_err(|e| IndexerError::SQLiteReadError(e.to_string())) - // }), - // ) - // .await - // .map_err(|e| IndexerError::SQLiteAsyncReadError(e.to_string()))?? - // }) - // }) } } diff --git a/crates/rooch-indexer/src/tests/test_indexer.rs b/crates/rooch-indexer/src/tests/test_indexer.rs index 800961e5f9..aad82b0a38 100644 --- a/crates/rooch-indexer/src/tests/test_indexer.rs +++ b/crates/rooch-indexer/src/tests/test_indexer.rs @@ -70,7 +70,7 @@ fn random_remove_object_states() -> Vec { remove_object_states } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_transaction_store() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); @@ -114,7 +114,7 @@ async fn test_transaction_store() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_event_store() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); @@ -148,7 +148,7 @@ async fn test_event_store() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_state_store() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); @@ -180,7 +180,7 @@ async fn test_state_store() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_object_type_query() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); @@ -231,7 +231,7 @@ async fn test_object_type_query() -> Result<()> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_escape_transaction() -> Result<()> { let registry_service = RegistryService::default(); let tmpdir = moveos_config::temp_dir(); From 182fb87ccbf802577aadc890c7ef06bd9b57f62c Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Mon, 12 Aug 2024 17:41:36 +0800 Subject: [PATCH 3/9] resolve test cases for rocksdb metrics because of the tokio dependceny --- Cargo.lock | 2 + crates/rooch-db/src/lib.rs | 42 ++++-- .../rooch-sequencer/tests/test_sequencer.rs | 30 ++++- .../src/tests/test_state_store.rs | 28 +++- moveos/raw-store/Cargo.toml | 2 + moveos/raw-store/src/lib.rs | 127 +++++++++--------- moveos/raw-store/src/metrics.rs | 19 +-- 7 files changed, 154 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c244944497..bfdf22c4f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8626,11 +8626,13 @@ dependencies = [ "moveos-common", "moveos-config", "once_cell", + "parking_lot 0.12.3", "prometheus", "rocksdb", "serde 1.0.205", "tap", "thiserror", + "tokio", "tracing", ] diff --git a/crates/rooch-db/src/lib.rs b/crates/rooch-db/src/lib.rs index 3988563b96..f9f2971157 100644 --- a/crates/rooch-db/src/lib.rs +++ b/crates/rooch-db/src/lib.rs @@ -23,8 +23,34 @@ pub struct RoochDB { impl RoochDB { pub fn init(config: &StoreConfig, registry: &Registry) -> Result { - let (store_dir, indexer_dir) = (config.get_store_dir(), config.get_indexer_dir()); + let instance = Self::generate_store_instance(config, registry)?; + Self::init_with_instance(config, instance, registry) + } + + pub fn init_with_instance( + config: &StoreConfig, + instance: StoreInstance, + registry: &Registry, + ) -> Result { + let indexer_dir = config.get_indexer_dir(); + let moveos_store = MoveOSStore::new_with_instance(instance.clone(), registry)?; + let rooch_store = RoochStore::new_with_instance(instance, registry)?; + let indexer_store = IndexerStore::new(indexer_dir.clone(), registry)?; + let indexer_reader = IndexerReader::new(indexer_dir, registry)?; + + Ok(Self { + moveos_store, + rooch_store, + indexer_store, + indexer_reader, + }) + } + pub fn generate_store_instance( + config: &StoreConfig, + registry: &Registry, + ) -> Result { + let store_dir = config.get_store_dir(); let mut column_families = moveos_store::StoreMeta::get_column_family_names().to_vec(); column_families.append(&mut rooch_store::StoreMeta::get_column_family_names().to_vec()); //ensure no duplicate column families @@ -43,19 +69,7 @@ impl RoochDB { db_metrics, ); - let moveos_store = MoveOSStore::new_with_instance(instance.clone(), registry)?; - - let rooch_store = RoochStore::new_with_instance(instance, registry)?; - - let indexer_store = IndexerStore::new(indexer_dir.clone(), registry)?; - let indexer_reader = IndexerReader::new(indexer_dir, registry)?; - - Ok(Self { - moveos_store, - rooch_store, - indexer_store, - indexer_reader, - }) + Ok(instance) } pub fn init_with_mock_metrics_for_test(config: &StoreConfig) -> Result { diff --git a/crates/rooch-sequencer/tests/test_sequencer.rs b/crates/rooch-sequencer/tests/test_sequencer.rs index e8ed76e90a..c4caabd53a 100644 --- a/crates/rooch-sequencer/tests/test_sequencer.rs +++ b/crates/rooch-sequencer/tests/test_sequencer.rs @@ -6,6 +6,7 @@ use coerce::actor::{system::ActorSystem, IntoActor}; use metrics::RegistryService; use prometheus::Registry; use raw_store::metrics::DBMetrics; +use raw_store::{StoreInstance, CF_METRICS_REPORT_PERIOD_MILLIS}; use rooch_config::RoochOpt; use rooch_db::RoochDB; use rooch_genesis::RoochGenesis; @@ -15,10 +16,21 @@ use rooch_types::{ service_status::ServiceStatus, transaction::{LedgerTxData, RoochTransaction}, }; +use std::time::Duration; fn init_rooch_db(opt: &RoochOpt, registry: &Registry) -> Result { DBMetrics::init(registry); - let rooch_db = RoochDB::init(opt.store_config(), registry)?; + let store_instance = RoochDB::generate_store_instance(opt.store_config(), registry)?; + init_rooch_db_with_instance(opt, store_instance, registry) +} + +fn init_rooch_db_with_instance( + opt: &RoochOpt, + instance: StoreInstance, + registry: &Registry, +) -> Result { + // DBMetrics::init(registry); + let rooch_db = RoochDB::init_with_instance(opt.store_config(), instance, registry)?; let network = opt.network(); let _genesis = RoochGenesis::load_or_init(network, &rooch_db)?; Ok(rooch_db) @@ -30,7 +42,15 @@ async fn test_sequencer() -> Result<()> { let mut last_tx_order = 0; let registry_service = RegistryService::default(); { - let rooch_db = init_rooch_db(&opt, ®istry_service.default_registry())?; + let mut store_instance = RoochDB::generate_store_instance( + opt.store_config(), + ®istry_service.default_registry(), + )?; + let rooch_db = init_rooch_db_with_instance( + &opt, + store_instance.clone(), + ®istry_service.default_registry(), + )?; let sequencer_key = RoochKeyPair::generate_secp256k1(); let mut sequencer = SequencerActor::new( sequencer_key, @@ -46,6 +66,10 @@ async fn test_sequencer() -> Result<()> { last_tx_order = ledger_tx.sequence_info.tx_order; } assert_eq!(sequencer.last_order(), last_tx_order); + + let _ = store_instance.cancel_metrics_task(); + // Wait for rocksdb cancel metrics task to avoid db lock + tokio::time::sleep(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS)).await; } // load from db again { @@ -69,7 +93,7 @@ async fn test_sequencer() -> Result<()> { // test concurrent // Build a sequencer actor and sequence transactions concurrently -#[tokio::test] +#[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_sequencer_concurrent() -> Result<()> { let opt = RoochOpt::new_with_temp_store()?; let registry_service = RegistryService::default(); diff --git a/moveos/moveos-store/src/tests/test_state_store.rs b/moveos/moveos-store/src/tests/test_state_store.rs index 708e0d08a5..821b2e417f 100644 --- a/moveos/moveos-store/src/tests/test_state_store.rs +++ b/moveos/moveos-store/src/tests/test_state_store.rs @@ -1,15 +1,19 @@ // Copyright (c) RoochNetwork // SPDX-License-Identifier: Apache-2.0 -use crate::MoveOSStore; +use crate::{MoveOSStore, StoreMeta}; use anyhow::Result; +use moveos_config::store_config::RocksdbConfig; use moveos_types::h256::H256; use moveos_types::test_utils::random_state_change_set; use raw_store::metrics::DBMetrics; +use raw_store::rocks::RocksDB; +use raw_store::{StoreInstance, CF_METRICS_REPORT_PERIOD_MILLIS}; use smt::NodeReader; +use std::time::Duration; #[tokio::test] -async fn test_reopen() { +async fn test_reopen() -> Result<()> { let temp_dir = moveos_config::temp_dir(); let registry_service = metrics::RegistryService::default(); DBMetrics::init(®istry_service.default_registry()); @@ -17,14 +21,29 @@ async fn test_reopen() { let key = H256::random(); let node = b"testnode".to_vec(); { - let moveos_store = - MoveOSStore::new(temp_dir.path(), ®istry_service.default_registry()).unwrap(); + let db_metrics = DBMetrics::get_or_init(®istry_service.default_registry()).clone(); + let mut store_instance = StoreInstance::new_db_instance( + RocksDB::new( + temp_dir.path(), + StoreMeta::get_column_family_names().to_vec(), + RocksdbConfig::default(), + )?, + db_metrics, + ); + let moveos_store = MoveOSStore::new_with_instance( + store_instance.clone(), + ®istry_service.default_registry(), + ) + .unwrap(); let node_store = moveos_store.get_state_node_store(); node_store .put(key, node.clone()) .map_err(|e| anyhow::anyhow!("test_state_store test_reopen error: {:?}", e)) .ok(); assert_eq!(node_store.get(&key).unwrap(), Some(node.clone())); + let _ = store_instance.cancel_metrics_task(); + // Wait for rocksdb cancel metrics task to avoid db lock + tokio::time::sleep(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS)).await; } { // To aviod AlreadyReg for re open the same db @@ -33,6 +52,7 @@ async fn test_reopen() { let node_store = moveos_store.get_state_node_store(); assert_eq!(node_store.get(&key).unwrap(), Some(node)); } + Ok(()) } #[tokio::test] diff --git a/moveos/raw-store/Cargo.toml b/moveos/raw-store/Cargo.toml index 23f55e747a..e2ddc58c56 100644 --- a/moveos/raw-store/Cargo.toml +++ b/moveos/raw-store/Cargo.toml @@ -17,7 +17,9 @@ rust-version = { workspace = true } anyhow = { workspace = true } serde = { workspace = true } tap = { workspace = true } +tokio = { workspace = true } rocksdb = { workspace = true, features = ["lz4"] } +parking_lot = { workspace = true } prometheus = { workspace = true } once_cell = { workspace = true } diff --git a/moveos/raw-store/src/lib.rs b/moveos/raw-store/src/lib.rs index c8e13b791f..d47f319b0e 100644 --- a/moveos/raw-store/src/lib.rs +++ b/moveos/raw-store/src/lib.rs @@ -16,6 +16,7 @@ use crate::rocks::{RocksDB, SchemaIterator}; use crate::traits::{DBStore, KVStore}; use anyhow::{bail, format_err, Result}; use moveos_common::utils::{from_bytes, to_bytes}; +use parking_lot::Mutex; use rocksdb::{properties, AsColumnFamilyRef}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -24,11 +25,13 @@ use std::ffi::CStr; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; +use tokio::sync::oneshot; /// Type alias to improve readability. pub type ColumnFamilyName = &'static str; -pub const CF_METRICS_REPORT_PERIOD_MILLIS: u64 = 1000; +pub const CF_METRICS_REPORT_PERIOD_MILLIS: u64 = 5000; pub const METRICS_ERROR: i64 = -1; // TODO: remove this after Rust rocksdb has the TOTAL_BLOB_FILES_SIZE property built-in. @@ -43,7 +46,9 @@ pub enum StoreInstance { DB { db: Arc, db_metrics: Arc, - // metrics_task_cancel_handle: Arc>, + // Send consumes self, but we only have a &self of the containing struct, + // so we put the sender in an Option containing + metrics_task_cancel_handle: Arc>>>, }, } @@ -51,71 +56,69 @@ unsafe impl Send for StoreInstance {} impl StoreInstance { pub fn new_db_instance(db: RocksDB, db_metrics: Arc) -> Self { - // let db_metrics = DBMetrics::get().clone(); let db_arc = Arc::new(db); - // let db_metrics = Arc::new(DBMetrics::new(registry)); - // let db_clone = db_arc.clone(); - // let db_metrics_clone = db_metrics.clone(); - // let (sender, mut recv) = tokio::sync::oneshot::channel(); + let db_clone = db_arc.clone(); + let db_metrics_clone = db_metrics.clone(); + let (sender, mut cancel_receiver) = tokio::sync::oneshot::channel(); - // TODO We need to find a more elegant implementation to avoid - // introducing tokio 1.x runtime dependency in the raw store layer, + // Introducing tokio 1.x runtime dependency in the raw store layer, // which would cause upper-level unit test cases and framework tests to depend on tokio. - - // tokio::spawn(async move { - // let mut interval = - // tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS)); - // loop { - // tokio::select! { - // _ = interval.tick() => { - // let cfs = db_clone.cfs.clone(); - // for cf_name in cfs { - // let db_clone_clone = db_clone.clone(); - // let db_metrics_clone_clone = db_metrics_clone.clone(); - // if let Err(e) = tokio::task::spawn_blocking(move || { - // Self::report_rocksdb_metrics(&db_clone_clone, cf_name, &db_metrics_clone_clone); - // }).await { - // error!("Failed to report cf metrics with error: {}", e); - // } - // // Self::report_rocksdb_metrics(&db_clone_clone, cf_name, &db_metrics_clone); - // } - // } - // _ = &mut recv => break, - // } - // } - // debug!("Returning to report cf metrics task for StoreInstance"); - // }); + tokio::spawn(async move { + let mut interval = + tokio::time::interval(Duration::from_millis(CF_METRICS_REPORT_PERIOD_MILLIS)); + loop { + tokio::select! { + _ = interval.tick() => { + let cfs = db_clone.cfs.clone(); + for cf_name in cfs { + let db_clone_clone = db_clone.clone(); + let db_metrics_clone_clone = db_metrics_clone.clone(); + if let Err(e) = tokio::task::spawn_blocking(move || { + Self::report_rocksdb_metrics(&db_clone_clone, cf_name, &db_metrics_clone_clone); + }).await { + tracing::error!("Failed to report cf metrics with error: {}", e); + } + } + } + _ = &mut cancel_receiver => { + tracing::info!("Metrics task cancelled for store instance"); + break; + } + } + } + tracing::debug!("Returning to report cf metrics task for store instance"); + }); Self::DB { db: db_arc, db_metrics, - // metrics_task_cancel_handle: Arc::new(sender), + metrics_task_cancel_handle: Arc::new(Mutex::new(Some(sender))), } } - // pub fn cancel_metrics_task(&mut self) -> Result<()> { - // match self { - // StoreInstance::DB { - // db: _, - // db_metrics: _, - // metrics_task_cancel_handle, - // } => { - // // metrics_task_cancel_handle.send() - // // Send a cancellation signal - // // metrics_task_cancel_handle - // let handle = Arc::get_mut(metrics_task_cancel_handle).unwrap(); - // handle.send()?; - // } - // }; - // Ok(()) - // } + pub fn cancel_metrics_task(&mut self) -> Result<()> { + match self { + StoreInstance::DB { + db: _, + db_metrics: _, + metrics_task_cancel_handle, + } => { + // Send a cancellation signal + let mut handle = metrics_task_cancel_handle.lock(); + if let Some(sender) = handle.take() { + let _r = sender.send(()); + } + } + }; + Ok(()) + } pub fn db(&self) -> Option<&RocksDB> { match self { StoreInstance::DB { db, db_metrics: _, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => Some(db.as_ref()), } } @@ -125,7 +128,7 @@ impl StoreInstance { StoreInstance::DB { db: _, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => Some(db_metrics.as_ref()), } } @@ -135,7 +138,7 @@ impl StoreInstance { StoreInstance::DB { db, db_metrics: _, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => Arc::get_mut(db), } } @@ -145,7 +148,7 @@ impl StoreInstance { StoreInstance::DB { db: _, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => Arc::get_mut(db_metrics), } } @@ -320,7 +323,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -343,7 +346,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -367,7 +370,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -385,7 +388,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -408,7 +411,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -440,7 +443,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -464,7 +467,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics @@ -488,7 +491,7 @@ impl DBStore for StoreInstance { StoreInstance::DB { db, db_metrics, - // metrics_task_cancel_handle: _, + metrics_task_cancel_handle: _, } => { let _timer = db_metrics .raw_store_metrics diff --git a/moveos/raw-store/src/metrics.rs b/moveos/raw-store/src/metrics.rs index 9ead52829e..0dcc6d6e70 100644 --- a/moveos/raw-store/src/metrics.rs +++ b/moveos/raw-store/src/metrics.rs @@ -10,15 +10,8 @@ use prometheus::{ register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry, }; -use rocksdb::PerfContext; -use std::cell::RefCell; use std::sync::Arc; use tap::TapFallible; -use tracing::warn; - -thread_local! { - static PER_THREAD_ROCKS_PERF_CONTEXT: std::cell::RefCell = RefCell::new(PerfContext::default()); -} #[derive(Debug)] pub struct RocksDBMetrics { @@ -376,7 +369,7 @@ pub struct DBMetrics { pub rocksdb_metrics: RocksDBMetrics, } -static ONCE: OnceCell> = OnceCell::new(); +static DB_METRICS_ONCE: OnceCell> = OnceCell::new(); impl DBMetrics { pub fn new(registry: &Registry) -> Self { @@ -391,10 +384,10 @@ impl DBMetrics { // only ever initialize db metrics once with a registry whereas // in the code we might want to initialize it with different // registries. - let _ = ONCE + let _ = DB_METRICS_ONCE .set(Self::inner_init(registry)) - .tap_err(|_| warn!("DBMetrics registry overwritten")); - ONCE.get().unwrap() + .tap_err(|_| tracing::warn!("DBMetrics registry overwritten")); + DB_METRICS_ONCE.get().unwrap() } fn inner_init(registry: &Registry) -> Arc { @@ -416,10 +409,10 @@ impl DBMetrics { } pub fn get() -> Option<&'static Arc> { - ONCE.get() + DB_METRICS_ONCE.get() } pub fn get_or_init(registry: &Registry) -> &'static Arc { - ONCE.get_or_init(|| Self::inner_init(registry).clone()) + DB_METRICS_ONCE.get_or_init(|| Self::inner_init(registry).clone()) } } From 16493974a352221bedec0f83bf503e26285d10b6 Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Tue, 13 Aug 2024 15:58:46 +0800 Subject: [PATCH 4/9] Run the rooch framework tests in the tokio runtime --- Cargo.lock | 1 + crates/rooch-framework-tests/Cargo.toml | 1 + .../rooch-framework-tests/src/tests/bitcoin_test.rs | 4 ++-- .../src/tests/bitcoin_validator_tests.rs | 4 ++-- .../rooch-framework-tests/src/tests/chain_id_test.rs | 4 ++-- crates/rooch-framework-tests/src/tests/empty_tests.rs | 4 ++-- .../rooch-framework-tests/src/tests/ethereum_test.rs | 4 ++-- .../src/tests/multisign_account_tests.rs | 4 ++-- .../src/tests/session_validator_tests.rs | 4 ++-- .../src/tests/view_function_gas.rs | 4 ++-- crates/rooch-framework-tests/tests/tests.rs | 11 ++++++++++- 11 files changed, 28 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bfdf22c4f0..2a36a93e45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9380,6 +9380,7 @@ dependencies = [ "rooch-types", "serde_json", "tempfile", + "tokio", "tracing", "tracing-subscriber", ] diff --git a/crates/rooch-framework-tests/Cargo.toml b/crates/rooch-framework-tests/Cargo.toml index fa78ba0570..e4f6ef4cfd 100644 --- a/crates/rooch-framework-tests/Cargo.toml +++ b/crates/rooch-framework-tests/Cargo.toml @@ -26,6 +26,7 @@ tempfile = { workspace = true } include_dir = { workspace = true } musig2 = { workspace = true } miniscript = { workspace = true } +tokio = { workspace = true } move-core-types = { workspace = true } diff --git a/crates/rooch-framework-tests/src/tests/bitcoin_test.rs b/crates/rooch-framework-tests/src/tests/bitcoin_test.rs index 3951f13b93..c00bdaaa5a 100644 --- a/crates/rooch-framework-tests/src/tests/bitcoin_test.rs +++ b/crates/rooch-framework-tests/src/tests/bitcoin_test.rs @@ -22,8 +22,8 @@ use std::collections::HashMap; use std::path::PathBuf; use tracing::{debug, info}; -#[test] -fn test_submit_block() { +#[tokio::test] +async fn test_submit_block() { let _ = tracing_subscriber::fmt::try_init(); let mut binding_test = binding_test::RustBindingTest::new().unwrap(); diff --git a/crates/rooch-framework-tests/src/tests/bitcoin_validator_tests.rs b/crates/rooch-framework-tests/src/tests/bitcoin_validator_tests.rs index 3edeef8cbe..83d37d20c3 100644 --- a/crates/rooch-framework-tests/src/tests/bitcoin_validator_tests.rs +++ b/crates/rooch-framework-tests/src/tests/bitcoin_validator_tests.rs @@ -11,8 +11,8 @@ use rooch_types::transaction::rooch::RoochTransactionData; use crate::binding_test; -#[test] -fn test_validate() { +#[tokio::test] +async fn test_validate() { let binding_test = binding_test::RustBindingTest::new().unwrap(); let root = binding_test.root().clone(); diff --git a/crates/rooch-framework-tests/src/tests/chain_id_test.rs b/crates/rooch-framework-tests/src/tests/chain_id_test.rs index d1a883c1c7..9759560b78 100644 --- a/crates/rooch-framework-tests/src/tests/chain_id_test.rs +++ b/crates/rooch-framework-tests/src/tests/chain_id_test.rs @@ -5,8 +5,8 @@ use crate::binding_test; use moveos_types::{module_binding::MoveFunctionCaller, state_resolver::StateResolver}; use rooch_types::{framework::chain_id::ChainID, rooch_network::BuiltinChainID}; -#[test] -fn test_chain_id() { +#[tokio::test] +async fn test_chain_id() { let _ = tracing_subscriber::fmt::try_init(); let binding_test = binding_test::RustBindingTest::new().unwrap(); let resolver = binding_test.resolver(); diff --git a/crates/rooch-framework-tests/src/tests/empty_tests.rs b/crates/rooch-framework-tests/src/tests/empty_tests.rs index bdb1350462..f7c2cf20da 100644 --- a/crates/rooch-framework-tests/src/tests/empty_tests.rs +++ b/crates/rooch-framework-tests/src/tests/empty_tests.rs @@ -5,8 +5,8 @@ use crate::binding_test; use moveos_types::module_binding::MoveFunctionCaller; use moveos_types::moveos_std::tx_context::TxContext; -#[test] -fn test_empty() { +#[tokio::test] +async fn test_empty() { let binding_test = binding_test::RustBindingTest::new().unwrap(); let empty = binding_test.as_module_binding::(); let ctx = TxContext::random_for_testing_only(); diff --git a/crates/rooch-framework-tests/src/tests/ethereum_test.rs b/crates/rooch-framework-tests/src/tests/ethereum_test.rs index 77f397c6e1..64c455c700 100644 --- a/crates/rooch-framework-tests/src/tests/ethereum_test.rs +++ b/crates/rooch-framework-tests/src/tests/ethereum_test.rs @@ -10,8 +10,8 @@ use rooch_key::keystore::memory_keystore::InMemKeystore; use rooch_types::framework::ethereum::BlockHeader; use rooch_types::transaction::rooch::RoochTransactionData; -#[test] -fn test_submit_block() { +#[tokio::test] +async fn test_submit_block() { let _ = tracing_subscriber::fmt::try_init(); let mut binding_test = binding_test::RustBindingTest::new().unwrap(); diff --git a/crates/rooch-framework-tests/src/tests/multisign_account_tests.rs b/crates/rooch-framework-tests/src/tests/multisign_account_tests.rs index 9bc1108219..ea975d4ce0 100644 --- a/crates/rooch-framework-tests/src/tests/multisign_account_tests.rs +++ b/crates/rooch-framework-tests/src/tests/multisign_account_tests.rs @@ -7,8 +7,8 @@ use rooch_key::keystore::account_keystore::AccountKeystore; use rooch_key::keystore::memory_keystore::InMemKeystore; use rooch_types::nursery::multisign_account::{self, MultisignAccountModule}; -#[test] -fn test_multisign_account() { +#[tokio::test] +async fn test_multisign_account() { let _ = tracing_subscriber::fmt::try_init(); let binding_test = binding_test::RustBindingTest::new().unwrap(); diff --git a/crates/rooch-framework-tests/src/tests/session_validator_tests.rs b/crates/rooch-framework-tests/src/tests/session_validator_tests.rs index b09363c71b..c255ffad18 100644 --- a/crates/rooch-framework-tests/src/tests/session_validator_tests.rs +++ b/crates/rooch-framework-tests/src/tests/session_validator_tests.rs @@ -20,8 +20,8 @@ use rooch_types::{addresses::ROOCH_FRAMEWORK_ADDRESS, framework::empty::Empty}; use rooch_types::{framework::session_key::SessionScope, transaction::rooch::RoochTransactionData}; use std::str::FromStr; -#[test] -fn test_session_key_rooch() { +#[tokio::test] +async fn test_session_key_rooch() { let _ = tracing_subscriber::fmt::try_init(); let mut binding_test = binding_test::RustBindingTest::new().unwrap(); diff --git a/crates/rooch-framework-tests/src/tests/view_function_gas.rs b/crates/rooch-framework-tests/src/tests/view_function_gas.rs index 357cb8d1e3..9efddbcd42 100644 --- a/crates/rooch-framework-tests/src/tests/view_function_gas.rs +++ b/crates/rooch-framework-tests/src/tests/view_function_gas.rs @@ -12,8 +12,8 @@ use rooch_types::framework::empty::Empty; use crate::binding_test; -#[test] -fn view_function_gas() { +#[tokio::test] +async fn view_function_gas() { let empty_call = FunctionCall::new( Empty::function_id(Empty::EMPTY_FUNCTION_NAME), vec![], diff --git a/crates/rooch-framework-tests/tests/tests.rs b/crates/rooch-framework-tests/tests/tests.rs index 82497e554f..befeb107ed 100644 --- a/crates/rooch-framework-tests/tests/tests.rs +++ b/crates/rooch-framework-tests/tests/tests.rs @@ -2,5 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use rooch_integration_test_runner::run_test; +use std::path::Path; +use tokio::runtime::Runtime; -datatest_stable::harness!(run_test, "tests", r".*\.(mvir|move)$"); +// Create a wrapper function that sets up the Tokio runtime +fn async_run_test(path: &Path) -> Result<(), Box> { + let runtime = + Runtime::new().expect("Failed to create Tokio runtime when execute async run test "); + runtime.block_on(async { run_test(path) }) +} + +datatest_stable::harness!(async_run_test, "tests", r".*\.(mvir|move)$"); From f0e2bebc6d263ce3da9a8df1b35498afd204b507 Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Tue, 13 Aug 2024 16:11:41 +0800 Subject: [PATCH 5/9] Run the rooch framework tests in the tokio runtime --- Cargo.lock | 1 + crates/rooch-integration-test-runner/Cargo.toml | 1 + crates/rooch-integration-test-runner/tests/tests.rs | 8 ++++++++ 3 files changed, 10 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 2a36a93e45..8118d20f5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9472,6 +9472,7 @@ dependencies = [ "rooch-framework", "rooch-genesis", "rooch-types", + "tokio", "tracing", "tracing-subscriber", "walkdir", diff --git a/crates/rooch-integration-test-runner/Cargo.toml b/crates/rooch-integration-test-runner/Cargo.toml index 7f732d38db..05a40fd275 100644 --- a/crates/rooch-integration-test-runner/Cargo.toml +++ b/crates/rooch-integration-test-runner/Cargo.toml @@ -26,6 +26,7 @@ once_cell = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } prometheus = { workspace = true } +tokio = { workspace = true } move-binary-format = { workspace = true } move-core-types = { workspace = true } diff --git a/crates/rooch-integration-test-runner/tests/tests.rs b/crates/rooch-integration-test-runner/tests/tests.rs index 82497e554f..dbbd76b5ff 100644 --- a/crates/rooch-integration-test-runner/tests/tests.rs +++ b/crates/rooch-integration-test-runner/tests/tests.rs @@ -2,5 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use rooch_integration_test_runner::run_test; +use std::path::Path; +use tokio::runtime::Runtime; + +pub fn async_run_test(path: &Path) -> Result<(), Box> { + let runtime = + Runtime::new().expect("Failed to create Tokio runtime when execute async run test "); + runtime.block_on(async { run_test(path) }) +} datatest_stable::harness!(run_test, "tests", r".*\.(mvir|move)$"); From 906579f88d93cd4a16cb54eec94d83ee8b7c09b1 Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Tue, 13 Aug 2024 16:31:53 +0800 Subject: [PATCH 6/9] Run the rooch integration tests in the tokio runtime --- crates/rooch-framework-tests/src/tests/bitcoin_test.rs | 4 ++-- crates/rooch-integration-test-runner/tests/tests.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/rooch-framework-tests/src/tests/bitcoin_test.rs b/crates/rooch-framework-tests/src/tests/bitcoin_test.rs index c00bdaaa5a..d71ce12cf5 100644 --- a/crates/rooch-framework-tests/src/tests/bitcoin_test.rs +++ b/crates/rooch-framework-tests/src/tests/bitcoin_test.rs @@ -183,8 +183,8 @@ fn check_utxo(txs: Vec, binding_test: &binding_test::RustBindingTes //this test takes too long time in debug mod run it in release mod, use command: //RUST_LOG=debug cargo test --release --package rooch-framework-tests --lib -- --include-ignored tests::bitcoin_test::test_real_bocks -#[test] -fn test_real_bocks() { +#[tokio::test] +async fn test_real_bocks() { let _ = tracing_subscriber::fmt::try_init(); if cfg!(debug_assertions) { info!("test_real_bocks is ignored in debug mode, please run it in release mode"); diff --git a/crates/rooch-integration-test-runner/tests/tests.rs b/crates/rooch-integration-test-runner/tests/tests.rs index dbbd76b5ff..2b646763d5 100644 --- a/crates/rooch-integration-test-runner/tests/tests.rs +++ b/crates/rooch-integration-test-runner/tests/tests.rs @@ -11,4 +11,4 @@ pub fn async_run_test(path: &Path) -> Result<(), Box Date: Tue, 13 Aug 2024 17:02:42 +0800 Subject: [PATCH 7/9] enable rocksdb metrics --- moveos/raw-store/src/lib.rs | 90 +++++++++------------------------ moveos/raw-store/src/metrics.rs | 69 ++++--------------------- 2 files changed, 32 insertions(+), 127 deletions(-) diff --git a/moveos/raw-store/src/lib.rs b/moveos/raw-store/src/lib.rs index d47f319b0e..51e17f0afe 100644 --- a/moveos/raw-store/src/lib.rs +++ b/moveos/raw-store/src/lib.rs @@ -74,7 +74,7 @@ impl StoreInstance { let db_clone_clone = db_clone.clone(); let db_metrics_clone_clone = db_metrics_clone.clone(); if let Err(e) = tokio::task::spawn_blocking(move || { - Self::report_rocksdb_metrics(&db_clone_clone, cf_name, &db_metrics_clone_clone); + let _ = Self::report_rocksdb_metrics(&db_clone_clone, cf_name, &db_metrics_clone_clone); }).await { tracing::error!("Failed to report cf metrics with error: {}", e); } @@ -153,8 +153,11 @@ impl StoreInstance { } } - #[allow(dead_code)] - fn report_rocksdb_metrics(rocksdb: &Arc, cf_name: &str, db_metrics: &Arc) { + fn report_rocksdb_metrics( + rocksdb: &Arc, + cf_name: &str, + db_metrics: &Arc, + ) -> Result<()> { let cf = rocksdb.get_cf_handle(cf_name); db_metrics .rocksdb_metrics @@ -180,38 +183,6 @@ impl StoreInstance { Self::get_int_property(rocksdb, &cf, properties::SIZE_ALL_MEM_TABLES) .unwrap_or(METRICS_ERROR), ); - db_metrics - .rocksdb_metrics - .rocksdb_num_snapshots - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::NUM_SNAPSHOTS) - .unwrap_or(METRICS_ERROR), - ); - db_metrics - .rocksdb_metrics - .rocksdb_oldest_snapshot_time - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::OLDEST_SNAPSHOT_TIME) - .unwrap_or(METRICS_ERROR), - ); - db_metrics - .rocksdb_metrics - .rocksdb_actual_delayed_write_rate - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::ACTUAL_DELAYED_WRITE_RATE) - .unwrap_or(METRICS_ERROR), - ); - db_metrics - .rocksdb_metrics - .rocksdb_is_write_stopped - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::IS_WRITE_STOPPED) - .unwrap_or(METRICS_ERROR), - ); db_metrics .rocksdb_metrics .rocksdb_block_cache_capacity @@ -228,30 +199,22 @@ impl StoreInstance { Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_USAGE) .unwrap_or(METRICS_ERROR), ); - db_metrics - .rocksdb_metrics - .rocksdb_block_cache_pinned_usage - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_PINNED_USAGE) - .unwrap_or(METRICS_ERROR), - ); - db_metrics - .rocksdb_metrics - .rocskdb_estimate_table_readers_mem - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_TABLE_READERS_MEM) - .unwrap_or(METRICS_ERROR), - ); - db_metrics - .rocksdb_metrics - .rocksdb_estimated_num_keys - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_NUM_KEYS) - .unwrap_or(METRICS_ERROR), - ); + // db_metrics + // .rocksdb_metrics + // .rocksdb_block_cache_hit + // .with_label_values(&[cf_name]) + // .set( + // Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_HIT_COUNT) + // .unwrap_or(METRICS_ERROR), + // ); + // db_metrics + // .rocksdb_metrics + // .rocksdb_block_cache_miss + // .with_label_values(&[cf_name]) + // .set( + // Self::get_int_property(rocksdb, &cf, properties::BLOCK_CACHE_MISS_COUNT) + // .unwrap_or(METRICS_ERROR), + // ); db_metrics .rocksdb_metrics .rocksdb_mem_table_flush_pending @@ -284,14 +247,6 @@ impl StoreInstance { Self::get_int_property(rocksdb, &cf, properties::NUM_RUNNING_FLUSHES) .unwrap_or(METRICS_ERROR), ); - db_metrics - .rocksdb_metrics - .rocksdb_estimate_oldest_key_time - .with_label_values(&[cf_name]) - .set( - Self::get_int_property(rocksdb, &cf, properties::ESTIMATE_OLDEST_KEY_TIME) - .unwrap_or(METRICS_ERROR), - ); db_metrics .rocksdb_metrics .rocskdb_background_errors @@ -300,6 +255,7 @@ impl StoreInstance { Self::get_int_property(rocksdb, &cf, properties::BACKGROUND_ERRORS) .unwrap_or(METRICS_ERROR), ); + Ok(()) } #[allow(dead_code)] diff --git a/moveos/raw-store/src/metrics.rs b/moveos/raw-store/src/metrics.rs index 0dcc6d6e70..2b3d5d376c 100644 --- a/moveos/raw-store/src/metrics.rs +++ b/moveos/raw-store/src/metrics.rs @@ -18,21 +18,15 @@ pub struct RocksDBMetrics { pub rocksdb_total_sst_files_size: IntGaugeVec, pub rocksdb_total_blob_files_size: IntGaugeVec, pub rocksdb_size_all_mem_tables: IntGaugeVec, - pub rocksdb_num_snapshots: IntGaugeVec, - pub rocksdb_oldest_snapshot_time: IntGaugeVec, - pub rocksdb_actual_delayed_write_rate: IntGaugeVec, - pub rocksdb_is_write_stopped: IntGaugeVec, pub rocksdb_block_cache_capacity: IntGaugeVec, pub rocksdb_block_cache_usage: IntGaugeVec, - pub rocksdb_block_cache_pinned_usage: IntGaugeVec, - pub rocskdb_estimate_table_readers_mem: IntGaugeVec, + pub rocksdb_block_cache_hit: IntGaugeVec, + pub rocksdb_block_cache_miss: IntGaugeVec, pub rocksdb_mem_table_flush_pending: IntGaugeVec, pub rocskdb_compaction_pending: IntGaugeVec, pub rocskdb_num_running_compactions: IntGaugeVec, pub rocksdb_num_running_flushes: IntGaugeVec, - pub rocksdb_estimate_oldest_key_time: IntGaugeVec, pub rocskdb_background_errors: IntGaugeVec, - pub rocksdb_estimated_num_keys: IntGaugeVec, } impl RocksDBMetrics { @@ -59,34 +53,6 @@ impl RocksDBMetrics { registry, ) .unwrap(), - rocksdb_num_snapshots: register_int_gauge_vec_with_registry!( - "rocksdb_num_snapshots", - "Number of snapshots held for the column family", - &["cf_name"], - registry, - ) - .unwrap(), - rocksdb_oldest_snapshot_time: register_int_gauge_vec_with_registry!( - "rocksdb_oldest_snapshot_time", - "Unit timestamp of the oldest unreleased snapshot", - &["cf_name"], - registry, - ) - .unwrap(), - rocksdb_actual_delayed_write_rate: register_int_gauge_vec_with_registry!( - "rocksdb_actual_delayed_write_rate", - "The current actual delayed write rate. 0 means no delay", - &["cf_name"], - registry, - ) - .unwrap(), - rocksdb_is_write_stopped: register_int_gauge_vec_with_registry!( - "rocksdb_is_write_stopped", - "A flag indicating whether writes are stopped on this column family. 1 indicates writes have been stopped.", - &["cf_name"], - registry, - ) - .unwrap(), rocksdb_block_cache_capacity: register_int_gauge_vec_with_registry!( "rocksdb_block_cache_capacity", "The block cache capacity of the column family.", @@ -101,18 +67,17 @@ impl RocksDBMetrics { registry, ) .unwrap(), - rocksdb_block_cache_pinned_usage: register_int_gauge_vec_with_registry!( - "rocksdb_block_cache_pinned_usage", - "The memory size used by the column family in the block cache where entries are pinned", + + rocksdb_block_cache_hit: register_int_gauge_vec_with_registry!( + "rocksdb_block_cache_hit", + "The cache hit counts by the column family in the block cache.", &["cf_name"], registry, ) .unwrap(), - rocskdb_estimate_table_readers_mem: register_int_gauge_vec_with_registry!( - "rocskdb_estimate_table_readers_mem", - "The estimated memory size used for reading SST tables in this column - family such as filters and index blocks. Note that this number does not - include the memory used in block cache.", + rocksdb_block_cache_miss: register_int_gauge_vec_with_registry!( + "rocksdb_block_cache_miss", + "The cache miss counts by the column family in the block cache.", &["cf_name"], registry, ) @@ -152,21 +117,6 @@ impl RocksDBMetrics { registry, ) .unwrap(), - rocksdb_estimate_oldest_key_time: register_int_gauge_vec_with_registry!( - "rocksdb_estimate_oldest_key_time", - "Estimation of the oldest key timestamp in the DB. Only available - for FIFO compaction with compaction_options_fifo.allow_compaction = false.", - &["cf_name"], - registry, - ) - .unwrap(), - rocksdb_estimated_num_keys: register_int_gauge_vec_with_registry!( - "rocksdb_estimated_num_keys", - "The estimated number of keys in the table", - &["cf_name"], - registry, - ) - .unwrap(), rocskdb_background_errors: register_int_gauge_vec_with_registry!( "rocskdb_background_errors", "The accumulated number of RocksDB background errors.", @@ -174,7 +124,6 @@ impl RocksDBMetrics { registry, ) .unwrap(), - } } } From 5cf4569e3fa47f3f5009f5cd0c9514af5df8ccf0 Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Tue, 13 Aug 2024 17:13:55 +0800 Subject: [PATCH 8/9] cleanup code --- crates/rooch-sequencer/tests/test_sequencer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/rooch-sequencer/tests/test_sequencer.rs b/crates/rooch-sequencer/tests/test_sequencer.rs index c4caabd53a..1be2936c2f 100644 --- a/crates/rooch-sequencer/tests/test_sequencer.rs +++ b/crates/rooch-sequencer/tests/test_sequencer.rs @@ -29,7 +29,6 @@ fn init_rooch_db_with_instance( instance: StoreInstance, registry: &Registry, ) -> Result { - // DBMetrics::init(registry); let rooch_db = RoochDB::init_with_instance(opt.store_config(), instance, registry)?; let network = opt.network(); let _genesis = RoochGenesis::load_or_init(network, &rooch_db)?; From e9a2b6145eeb3b014197d0e61f7ca3646e08724e Mon Sep 17 00:00:00 2001 From: baichuan3 Date: Tue, 13 Aug 2024 21:44:44 +0800 Subject: [PATCH 9/9] run move unit tests in tokio runtime --- crates/rooch/src/commands/move_cli/commands/unit_test.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/rooch/src/commands/move_cli/commands/unit_test.rs b/crates/rooch/src/commands/move_cli/commands/unit_test.rs index cac3348e95..b4109e9779 100644 --- a/crates/rooch/src/commands/move_cli/commands/unit_test.rs +++ b/crates/rooch/src/commands/move_cli/commands/unit_test.rs @@ -30,6 +30,7 @@ use serde_json::Value; use std::rc::Rc; use std::{collections::BTreeMap, path::PathBuf}; use termcolor::Buffer; +use tokio::runtime::Runtime; #[derive(Parser)] #[group(skip)] @@ -122,8 +123,11 @@ impl CommandAction> for TestCommand { } } -static MOVEOSSTORE: Lazy<(MoveOSStore, DataDirPath)> = - Lazy::new(|| MoveOSStore::mock_moveos_store().unwrap()); +static MOVEOSSTORE: Lazy<(MoveOSStore, DataDirPath)> = Lazy::new(|| { + let runtime = Runtime::new() + .expect("Failed to create Tokio runtime when mock moveos store in move unit test"); + runtime.block_on(async { MoveOSStore::mock_moveos_store().unwrap() }) +}); static RESOLVER: Lazy>> = Lazy::new(|| { Box::new(RootObjectResolver::new(