Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Indexer] Optmize indexer query with timeout and Support Rocksdb metrics #2425

Merged
merged 10 commits into from
Aug 13, 2024
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 28 additions & 14 deletions crates/rooch-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,34 @@ pub struct RoochDB {

impl RoochDB {
pub fn init(config: &StoreConfig, registry: &Registry) -> Result<Self> {
let (store_dir, indexer_dir) = (config.get_store_dir(), config.get_indexer_dir());
let instance = Self::generate_store_instance(config, registry)?;
Self::init_with_instance(config, instance, registry)
}

pub fn init_with_instance(
config: &StoreConfig,
instance: StoreInstance,
registry: &Registry,
) -> Result<Self> {
let indexer_dir = config.get_indexer_dir();
let moveos_store = MoveOSStore::new_with_instance(instance.clone(), registry)?;
let rooch_store = RoochStore::new_with_instance(instance, registry)?;
let indexer_store = IndexerStore::new(indexer_dir.clone(), registry)?;
let indexer_reader = IndexerReader::new(indexer_dir, registry)?;

Ok(Self {
moveos_store,
rooch_store,
indexer_store,
indexer_reader,
})
}

pub fn generate_store_instance(
config: &StoreConfig,
registry: &Registry,
) -> Result<StoreInstance> {
let store_dir = config.get_store_dir();
let mut column_families = moveos_store::StoreMeta::get_column_family_names().to_vec();
column_families.append(&mut rooch_store::StoreMeta::get_column_family_names().to_vec());
//ensure no duplicate column families
Expand All @@ -43,19 +69,7 @@ impl RoochDB {
db_metrics,
);

let moveos_store = MoveOSStore::new_with_instance(instance.clone(), registry)?;

let rooch_store = RoochStore::new_with_instance(instance, registry)?;

let indexer_store = IndexerStore::new(indexer_dir.clone(), registry)?;
let indexer_reader = IndexerReader::new(indexer_dir, registry)?;

Ok(Self {
moveos_store,
rooch_store,
indexer_store,
indexer_reader,
})
Ok(instance)
}

pub fn init_with_mock_metrics_for_test(config: &StoreConfig) -> Result<Self> {
Expand Down
1 change: 1 addition & 0 deletions crates/rooch-framework-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tempfile = { workspace = true }
include_dir = { workspace = true }
musig2 = { workspace = true }
miniscript = { workspace = true }
tokio = { workspace = true }

move-core-types = { workspace = true }

Expand Down
8 changes: 4 additions & 4 deletions crates/rooch-framework-tests/src/tests/bitcoin_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::collections::HashMap;
use std::path::PathBuf;
use tracing::{debug, info};

#[test]
fn test_submit_block() {
#[tokio::test]
async fn test_submit_block() {
let _ = tracing_subscriber::fmt::try_init();
let mut binding_test = binding_test::RustBindingTest::new().unwrap();

Expand Down Expand Up @@ -183,8 +183,8 @@ fn check_utxo(txs: Vec<Transaction>, binding_test: &binding_test::RustBindingTes

//this test takes too long time in debug mod run it in release mod, use command:
//RUST_LOG=debug cargo test --release --package rooch-framework-tests --lib -- --include-ignored tests::bitcoin_test::test_real_bocks
#[test]
fn test_real_bocks() {
#[tokio::test]
async fn test_real_bocks() {
let _ = tracing_subscriber::fmt::try_init();
if cfg!(debug_assertions) {
info!("test_real_bocks is ignored in debug mode, please run it in release mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use rooch_types::transaction::rooch::RoochTransactionData;

use crate::binding_test;

#[test]
fn test_validate() {
#[tokio::test]
async fn test_validate() {
let binding_test = binding_test::RustBindingTest::new().unwrap();
let root = binding_test.root().clone();

Expand Down
4 changes: 2 additions & 2 deletions crates/rooch-framework-tests/src/tests/chain_id_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::binding_test;
use moveos_types::{module_binding::MoveFunctionCaller, state_resolver::StateResolver};
use rooch_types::{framework::chain_id::ChainID, rooch_network::BuiltinChainID};

#[test]
fn test_chain_id() {
#[tokio::test]
async fn test_chain_id() {
let _ = tracing_subscriber::fmt::try_init();
let binding_test = binding_test::RustBindingTest::new().unwrap();
let resolver = binding_test.resolver();
Expand Down
4 changes: 2 additions & 2 deletions crates/rooch-framework-tests/src/tests/empty_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::binding_test;
use moveos_types::module_binding::MoveFunctionCaller;
use moveos_types::moveos_std::tx_context::TxContext;

#[test]
fn test_empty() {
#[tokio::test]
async fn test_empty() {
let binding_test = binding_test::RustBindingTest::new().unwrap();
let empty = binding_test.as_module_binding::<rooch_types::framework::empty::Empty>();
let ctx = TxContext::random_for_testing_only();
Expand Down
4 changes: 2 additions & 2 deletions crates/rooch-framework-tests/src/tests/ethereum_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use rooch_key::keystore::memory_keystore::InMemKeystore;
use rooch_types::framework::ethereum::BlockHeader;
use rooch_types::transaction::rooch::RoochTransactionData;

#[test]
fn test_submit_block() {
#[tokio::test]
async fn test_submit_block() {
let _ = tracing_subscriber::fmt::try_init();
let mut binding_test = binding_test::RustBindingTest::new().unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use rooch_key::keystore::account_keystore::AccountKeystore;
use rooch_key::keystore::memory_keystore::InMemKeystore;
use rooch_types::nursery::multisign_account::{self, MultisignAccountModule};

#[test]
fn test_multisign_account() {
#[tokio::test]
async fn test_multisign_account() {
let _ = tracing_subscriber::fmt::try_init();
let binding_test = binding_test::RustBindingTest::new().unwrap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use rooch_types::{addresses::ROOCH_FRAMEWORK_ADDRESS, framework::empty::Empty};
use rooch_types::{framework::session_key::SessionScope, transaction::rooch::RoochTransactionData};
use std::str::FromStr;

#[test]
fn test_session_key_rooch() {
#[tokio::test]
async fn test_session_key_rooch() {
let _ = tracing_subscriber::fmt::try_init();
let mut binding_test = binding_test::RustBindingTest::new().unwrap();

Expand Down
4 changes: 2 additions & 2 deletions crates/rooch-framework-tests/src/tests/view_function_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use rooch_types::framework::empty::Empty;

use crate::binding_test;

#[test]
fn view_function_gas() {
#[tokio::test]
async fn view_function_gas() {
let empty_call = FunctionCall::new(
Empty::function_id(Empty::EMPTY_FUNCTION_NAME),
vec![],
Expand Down
11 changes: 10 additions & 1 deletion crates/rooch-framework-tests/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,14 @@
// SPDX-License-Identifier: Apache-2.0

use rooch_integration_test_runner::run_test;
use std::path::Path;
use tokio::runtime::Runtime;

datatest_stable::harness!(run_test, "tests", r".*\.(mvir|move)$");
// Create a wrapper function that sets up the Tokio runtime
fn async_run_test(path: &Path) -> Result<(), Box<dyn std::error::Error + 'static>> {
let runtime =
Runtime::new().expect("Failed to create Tokio runtime when execute async run test ");
runtime.block_on(async { run_test(path) })
}

datatest_stable::harness!(async_run_test, "tests", r".*\.(mvir|move)$");
1 change: 1 addition & 0 deletions crates/rooch-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ log = { workspace = true }
tap = { workspace = true }
prometheus = { workspace = true }
function_name = { workspace = true }
futures = { workspace = true }

move-core-types = { workspace = true }

Expand Down
3 changes: 3 additions & 0 deletions crates/rooch-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub enum IndexerError {
#[error("Indexer failed to read SQLiteDB with error: `{0}`")]
SQLiteReadError(String),

#[error("Indexer async read SQLiteDB error: `{0}`")]
SQLiteAsyncReadError(String),

#[error("Indexer failed to reset SQLiteDB with error: `{0}`")]
SQLiteResetError(String),

Expand Down
52 changes: 45 additions & 7 deletions crates/rooch-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ use std::collections::HashMap;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time::timeout;

pub const DEFAULT_QUERY_TIMEOUT: u64 = 60; // second

pub const TX_ORDER_STR: &str = "tx_order";
pub const TX_HASH_STR: &str = "tx_hash";
Expand Down Expand Up @@ -99,6 +104,33 @@ impl InnerIndexerReader {
.transaction(query)
.map_err(|e| IndexerError::SQLiteReadError(e.to_string()))
}

pub fn run_query_with_timeout<T, E, F>(&self, query: F) -> Result<T, IndexerError>
where
F: FnOnce(&mut SqliteConnection) -> Result<T, E> + Send + 'static,
E: From<diesel::result::Error> + std::error::Error + Send,
T: Send + 'static,
{
// default query time out in second
let timeout_duration = Duration::from_secs(DEFAULT_QUERY_TIMEOUT);
let mut connection = self.get_connection()?;

tokio::task::block_in_place(|| {
Handle::current().block_on(async move {
timeout(
timeout_duration,
tokio::task::spawn_blocking(move || {
connection
.deref_mut()
.transaction(query)
.map_err(|e| IndexerError::SQLiteReadError(e.to_string()))
}),
)
.await
.map_err(|e| IndexerError::SQLiteAsyncReadError(e.to_string()))??
})
})
Comment on lines +118 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这一部分代码后面可以通过 ActorContext::spawn 放到一个单独的 actor 去执行,这样就不会 block indexer_reader actor 的线程。不过返回值如何获取需要设计一下。

}
}

#[derive(Clone)]
Expand Down Expand Up @@ -166,7 +198,7 @@ impl IndexerReader {
} else if descending_order {
let max_tx_order: i64 = self
.get_inner_indexer_reader(INDEXER_TRANSACTIONS_TABLE_NAME)?
.run_query(|conn| {
.run_query_with_timeout(|conn| {
transactions::dsl::transactions
.select(transactions::tx_order)
.order_by(transactions::tx_order.desc())
Expand Down Expand Up @@ -236,7 +268,9 @@ impl IndexerReader {
tracing::debug!("query transactions: {}", query);
let stored_transactions = self
.get_inner_indexer_reader(INDEXER_TRANSACTIONS_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredTransaction>(conn))?;
.run_query_with_timeout(|conn| {
diesel::sql_query(query).load::<StoredTransaction>(conn)
})?;

let result = stored_transactions
.into_iter()
Expand Down Expand Up @@ -272,7 +306,7 @@ impl IndexerReader {
} else if descending_order {
let (max_tx_order, event_index): (i64, i64) = self
.get_inner_indexer_reader(INDEXER_EVENTS_TABLE_NAME)?
.run_query(|conn| {
.run_query_with_timeout(|conn| {
events::dsl::events
.select((events::tx_order, events::event_index))
.order_by((events::tx_order.desc(), events::event_index.desc()))
Expand Down Expand Up @@ -353,7 +387,7 @@ impl IndexerReader {
tracing::debug!("query events: {}", query);
let stored_events = self
.get_inner_indexer_reader(INDEXER_EVENTS_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredEvent>(conn))?;
.run_query_with_timeout(|conn| diesel::sql_query(query).load::<StoredEvent>(conn))?;

let result = stored_events
.into_iter()
Expand Down Expand Up @@ -382,7 +416,7 @@ impl IndexerReader {
} else if descending_order {
let (max_tx_order, state_index): (i64, i64) = self
.get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)?
.run_query(|conn| {
.run_query_with_timeout(|conn| {
object_states::dsl::object_states
.select((object_states::tx_order, object_states::state_index))
.order_by((
Expand Down Expand Up @@ -458,7 +492,9 @@ impl IndexerReader {
tracing::debug!("query object states: {}", query);
let stored_object_states = self
.get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredObjectState>(conn))?;
.run_query_with_timeout(|conn| {
diesel::sql_query(query).load::<StoredObjectState>(conn)
})?;
Ok(stored_object_states)
}

Expand Down Expand Up @@ -525,7 +561,9 @@ impl IndexerReader {
tracing::debug!("query last state index by tx order: {}", query);
let stored_object_states = self
.get_inner_indexer_reader(INDEXER_OBJECT_STATES_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredObjectState>(conn))?;
.run_query_with_timeout(|conn| {
diesel::sql_query(query).load::<StoredObjectState>(conn)
})?;
let last_state_index = if stored_object_states.is_empty() {
0
} else {
Expand Down
4 changes: 2 additions & 2 deletions crates/rooch-indexer/src/tests/test_concurrence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ mod tests {
use std::thread;
use std::time::Duration;

#[test]
fn test_sqlite_writer_sqlite_reader_concurrence() {
#[tokio::test]
async fn test_sqlite_writer_sqlite_reader_concurrence() {
let count = sqlite_writer_sqlite_reader_concurrence().unwrap();
assert_eq!(count, get_row_count());
}
Expand Down
Loading
Loading