diff --git a/CHANGELOG.md b/CHANGELOG.md index 197f4381b7f..3d7afae994a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods. - [2341](https://github.com/FuelLabs/fuel-core/pull/2341): Updated all pagination queries to work with the async stream instead of the sync iterator. +- [2340](https://github.com/FuelLabs/fuel-core/pull/2340): Avoid long heavy tasks in the GraphQL service by splitting work into batches. - [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Limited the number of threads used by the GraphQL service. #### Breaking diff --git a/benches/benches/transaction_throughput.rs b/benches/benches/transaction_throughput.rs index 23454dda359..5d78818e8da 100644 --- a/benches/benches/transaction_throughput.rs +++ b/benches/benches/transaction_throughput.rs @@ -89,6 +89,7 @@ where test_builder.trigger = Trigger::Never; test_builder.utxo_validation = true; test_builder.gas_limit = Some(10_000_000_000); + test_builder.block_size_limit = Some(1_000_000_000_000); // spin up node let transactions: Vec = diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 0b50c190bc2..7733e99b9a7 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -490,6 +490,7 @@ impl Command { graphql_config: GraphQLConfig { addr, number_of_threads: graphql.graphql_number_of_threads, + database_batch_size: graphql.database_batch_size, max_queries_depth: graphql.graphql_max_depth, max_queries_complexity: graphql.graphql_max_complexity, max_queries_recursive_depth: graphql.graphql_max_recursive_depth, diff --git a/bin/fuel-core/src/cli/run/graphql.rs b/bin/fuel-core/src/cli/run/graphql.rs index 5df36adce03..5816b2ecbc0 100644 --- a/bin/fuel-core/src/cli/run/graphql.rs +++ b/bin/fuel-core/src/cli/run/graphql.rs @@ -18,6 +18,10 @@ pub struct GraphQLArgs { #[clap(long = "graphql-number-of-threads", default_value = "2", env)] pub graphql_number_of_threads: usize, + /// The size of the batch fetched from the database by GraphQL service. + #[clap(long = "graphql-database-batch-size", default_value = "100", env)] + pub database_batch_size: usize, + /// The max depth of GraphQL queries. #[clap(long = "graphql-max-depth", default_value = "16", env)] pub graphql_max_depth: usize, diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index c878c071335..3d7abda01bc 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -280,10 +280,7 @@ mod tests { fuel_asm::Word, fuel_tx::*, }; - use futures::{ - StreamExt, - TryStreamExt, - }; + use futures::TryStreamExt; use itertools::Itertools; use rand::{ rngs::StdRng, @@ -988,7 +985,7 @@ mod tests { fn service_database(&self) -> ServiceDatabase { let on_chain = self.database.on_chain().clone(); let off_chain = self.database.off_chain().clone(); - ServiceDatabase::new(0u32.into(), on_chain, off_chain) + ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain) } } @@ -1045,8 +1042,7 @@ mod tests { let query = self.service_database(); let query = query.test_view(); query - .owned_coins_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| query.coin(id).unwrap())) + .owned_coins(owner, None, IterDirection::Forward) .try_collect() .await .unwrap() @@ -1056,8 +1052,7 @@ mod tests { let query = self.service_database(); let query = query.test_view(); query - .owned_message_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| query.message(&id).unwrap())) + .owned_messages(owner, None, IterDirection::Forward) .try_collect() .await .unwrap() diff --git a/crates/fuel-core/src/database/block.rs b/crates/fuel-core/src/database/block.rs index c6295e62017..374a76a9663 100644 --- a/crates/fuel-core/src/database/block.rs +++ b/crates/fuel-core/src/database/block.rs @@ -67,7 +67,8 @@ impl OnChainIterableKeyValueView { let db_block = self.storage::().get(height)?; if let Some(block) = db_block { // fetch all the transactions - // TODO: optimize with multi-key get + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 let txs = block .transactions() .iter() diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index b7727a456b9..772bbc815ea 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -33,6 +33,7 @@ pub struct Config { pub struct ServiceConfig { pub addr: SocketAddr, pub number_of_threads: usize, + pub database_batch_size: usize, pub max_queries_depth: usize, pub max_queries_complexity: usize, pub max_queries_recursive_depth: usize, diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index 46395118be9..28a714f8b0e 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -236,8 +236,12 @@ where graphql_api::initialize_query_costs(config.config.costs.clone())?; let network_addr = config.config.addr; - let combined_read_database = - ReadDatabase::new(genesis_block_height, on_database, off_database); + let combined_read_database = ReadDatabase::new( + config.config.database_batch_size, + genesis_block_height, + on_database, + off_database, + ); let request_timeout = config.config.api_request_timeout; let concurrency_limit = config.config.max_concurrent_queries; let body_limit = config.config.request_body_bytes_limit; diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index 3f254032b0c..bf47c8d92a7 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -5,6 +5,7 @@ use crate::fuel_core_graphql_api::{ OnChainDatabase, }, }; +use fuel_core_services::yield_stream::StreamYieldExt; use fuel_core_storage::{ iter::{ BoxedIter, @@ -77,6 +78,8 @@ pub type OffChainView = Arc; /// The container of the on-chain and off-chain database view provides. /// It is used only by `ViewExtension` to create a [`ReadView`]. pub struct ReadDatabase { + /// The size of the batch during fetching from the database. + batch_size: usize, /// The height of the genesis block. genesis_height: BlockHeight, /// The on-chain database view provider. @@ -88,6 +91,7 @@ pub struct ReadDatabase { impl ReadDatabase { /// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers. pub fn new( + batch_size: usize, genesis_height: BlockHeight, on_chain: OnChain, off_chain: OffChain, @@ -99,6 +103,7 @@ impl ReadDatabase { OffChain::LatestView: OffChainDatabase, { Self { + batch_size, genesis_height, on_chain: Box::new(ArcWrapper::new(on_chain)), off_chain: Box::new(ArcWrapper::new(off_chain)), @@ -111,6 +116,7 @@ impl ReadDatabase { // It is not possible to implement until `view_at` is implemented for the `AtomicView`. // https://github.com/FuelLabs/fuel-core/issues/1582 Ok(ReadView { + batch_size: self.batch_size, genesis_height: self.genesis_height, on_chain: self.on_chain.latest_view()?, off_chain: self.off_chain.latest_view()?, @@ -125,6 +131,7 @@ impl ReadDatabase { #[derive(Clone)] pub struct ReadView { + pub(crate) batch_size: usize, pub(crate) genesis_height: BlockHeight, pub(crate) on_chain: OnChainView, pub(crate) off_chain: OffChainView, @@ -134,7 +141,7 @@ impl ReadView { pub fn transaction(&self, tx_id: &TxId) -> StorageResult { let result = self.on_chain.transaction(tx_id); if result.is_not_found() { - if let Some(tx) = self.old_transaction(tx_id)? { + if let Some(tx) = self.off_chain.old_transaction(tx_id)? { Ok(tx) } else { Err(not_found!(Transactions)) @@ -144,6 +151,21 @@ impl ReadView { } } + pub async fn transactions( + &self, + tx_ids: Vec, + ) -> Vec> { + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 + let result = tx_ids + .iter() + .map(|tx_id| self.transaction(tx_id)) + .collect::>(); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + result + } + pub fn block(&self, height: &BlockHeight) -> StorageResult { if *height >= self.genesis_height { self.on_chain.block(height) @@ -252,6 +274,7 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { futures::stream::iter(self.on_chain.all_messages(start_message_id, direction)) + .yield_each(self.batch_size) } pub fn message_exists(&self, nonce: &Nonce) -> StorageResult { @@ -276,6 +299,7 @@ impl ReadView { start_asset, direction, )) + .yield_each(self.batch_size) } pub fn da_height(&self) -> StorageResult { @@ -316,12 +340,12 @@ impl ReadView { futures::stream::iter(iter) } - pub fn owned_message_ids<'a>( - &'a self, - owner: &'a Address, + pub fn owned_message_ids( + &self, + owner: &Address, start_message_id: Option, direction: IterDirection, - ) -> impl Stream> + 'a { + ) -> impl Stream> + '_ { futures::stream::iter(self.off_chain.owned_message_ids( owner, start_message_id, @@ -345,29 +369,6 @@ impl ReadView { self.off_chain.contract_salt(contract_id) } - pub fn old_block(&self, height: &BlockHeight) -> StorageResult { - self.off_chain.old_block(height) - } - - pub fn old_blocks( - &self, - height: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.off_chain.old_blocks(height, direction) - } - - pub fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult { - self.off_chain.old_block_consensus(height) - } - - pub fn old_transaction( - &self, - id: &TxId, - ) -> StorageResult> { - self.off_chain.old_transaction(id) - } - pub fn relayed_tx_status( &self, id: Bytes32, diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index 1a715b74522..161fd64b87e 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -4,6 +4,7 @@ use asset_query::{ AssetSpendTarget, AssetsQuery, }; +use fuel_core_services::yield_stream::StreamYieldExt; use fuel_core_storage::{ iter::IterDirection, Result as StorageResult, @@ -106,5 +107,6 @@ impl ReadView { }) .map_ok(|stream| stream.map(Ok)) .try_flatten() + .yield_each(self.batch_size) } } diff --git a/crates/fuel-core/src/query/balance/asset_query.rs b/crates/fuel-core/src/query/balance/asset_query.rs index 5482f6b5272..13a289ec1e4 100644 --- a/crates/fuel-core/src/query/balance/asset_query.rs +++ b/crates/fuel-core/src/query/balance/asset_query.rs @@ -15,7 +15,10 @@ use fuel_core_types::{ AssetId, }, }; -use futures::Stream; +use futures::{ + Stream, + TryStreamExt, +}; use std::collections::HashSet; use tokio_stream::StreamExt; @@ -78,7 +81,9 @@ impl<'a> AssetsQuery<'a> { fn coins_iter(mut self) -> impl Stream> + 'a { let allowed_assets = self.allowed_assets.take(); - self.database + let database = self.database; + let stream = self + .database .owned_coins_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) .filter(move |result| { @@ -99,12 +104,25 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not UTXO").into()); }; - // TODO: Fetch coin in a separate thread - let coin = self.database.coin(id)?; - - Ok(CoinType::Coin(coin)) + Ok(id) }) + }); + + futures::stream::StreamExt::chunks(stream, database.batch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) }) + .try_filter_map(move |chunk| async move { + let chunk = database + .coins(chunk) + .await + .map(|result| result.map(CoinType::Coin)); + Ok(Some(futures::stream::iter(chunk))) + }) + .try_flatten() .filter(move |result| { if let Ok(CoinType::Coin(coin)) = result { allowed_asset(&allowed_assets, &coin.asset_id) @@ -117,7 +135,8 @@ impl<'a> AssetsQuery<'a> { fn messages_iter(&self) -> impl Stream> + 'a { let exclude = self.exclude; let database = self.database; - self.database + let stream = self + .database .owned_message_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) .filter(move |result| { @@ -138,11 +157,22 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not a message").into()); }; - // TODO: Fetch message in a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - let message = database.message(&id)?; - Ok(message) + Ok(id) }) + }); + + futures::stream::StreamExt::chunks(stream, database.batch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = database.messages(chunk).await; + Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) }) + .try_flatten() .filter(|result| { if let Ok(message) = result { message.data().is_empty() diff --git a/crates/fuel-core/src/query/block.rs b/crates/fuel-core/src/query/block.rs index 3b725ab8b49..f4b461639f0 100644 --- a/crates/fuel-core/src/query/block.rs +++ b/crates/fuel-core/src/query/block.rs @@ -1,4 +1,5 @@ use crate::fuel_core_graphql_api::database::ReadView; +use fuel_core_services::yield_stream::StreamYieldExt; use fuel_core_storage::{ iter::IterDirection, Result as StorageResult, @@ -23,6 +24,6 @@ impl ReadView { height: Option, direction: IterDirection, ) -> impl Stream> + '_ { - futures::stream::iter(self.blocks(height, direction)) + futures::stream::iter(self.blocks(height, direction)).yield_each(self.batch_size) } } diff --git a/crates/fuel-core/src/query/coin.rs b/crates/fuel-core/src/query/coin.rs index 94a2fbafe54..c487bdba23c 100644 --- a/crates/fuel-core/src/query/coin.rs +++ b/crates/fuel-core/src/query/coin.rs @@ -3,6 +3,7 @@ use fuel_core_storage::{ iter::IterDirection, not_found, tables::Coins, + Error as StorageError, Result as StorageResult, StorageAsRef, }; @@ -14,6 +15,7 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; impl ReadView { @@ -29,6 +31,18 @@ impl ReadView { Ok(coin.uncompress(utxo_id)) } + pub async fn coins( + &self, + utxo_ids: Vec, + ) -> impl Iterator> + '_ { + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 + let coins = utxo_ids.into_iter().map(|id| self.coin(id)); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + coins + } + pub fn owned_coins( &self, owner: &Address, @@ -36,11 +50,17 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { self.owned_coins_ids(owner, start_coin, direction) - .map(|res| { - res.and_then(|id| { - // TODO: Move fetching of the coin to a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - self.coin(id) - }) + .chunks(self.batch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = self.coins(chunk).await; + Ok(Some(futures::stream::iter(chunk))) }) + .try_flatten() } } diff --git a/crates/fuel-core/src/query/message.rs b/crates/fuel-core/src/query/message.rs index 56c83431ee5..c98b2358b2c 100644 --- a/crates/fuel-core/src/query/message.rs +++ b/crates/fuel-core/src/query/message.rs @@ -25,7 +25,6 @@ use fuel_core_types::{ fuel_tx::{ input::message::compute_message_id, Receipt, - Transaction, TxId, }, fuel_types::{ @@ -40,6 +39,7 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; use itertools::Itertools; use std::borrow::Cow; @@ -81,6 +81,18 @@ impl ReadView { .map(Cow::into_owned) } + pub async fn messages( + &self, + ids: Vec, + ) -> impl Iterator> + '_ { + // TODO: Use multiget when it's implemented. + // https://github.com/FuelLabs/fuel-core/issues/2344 + let messages = ids.into_iter().map(|id| self.message(&id)); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + messages + } + pub fn owned_messages<'a>( &'a self, owner: &'a Address, @@ -88,12 +100,16 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + 'a { self.owned_message_ids(owner, start_message_id, direction) - .map(|result| { - result.and_then(|id| { - // TODO: Move `message` fetching to a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - self.message(&id) - }) + .chunks(self.batch_size) + .map(|chunk| { + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = self.messages(chunk).await; + Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) }) + .try_flatten() } } @@ -102,9 +118,6 @@ pub trait MessageProofData { /// Get the block. fn block(&self, id: &BlockHeight) -> StorageResult; - /// Get the transaction. - fn transaction(&self, transaction_id: &TxId) -> StorageResult; - /// Return all receipts in the given transaction. fn receipts(&self, transaction_id: &TxId) -> StorageResult>; @@ -128,10 +141,6 @@ impl MessageProofData for ReadView { self.block(id) } - fn transaction(&self, transaction_id: &TxId) -> StorageResult { - self.transaction(transaction_id) - } - fn receipts(&self, transaction_id: &TxId) -> StorageResult> { self.receipts(transaction_id) } @@ -140,7 +149,7 @@ impl MessageProofData for ReadView { &self, transaction_id: &TxId, ) -> StorageResult { - self.status(transaction_id) + self.tx_status(transaction_id) } fn block_history_proof( diff --git a/crates/fuel-core/src/query/message/test.rs b/crates/fuel-core/src/query/message/test.rs index d49f173370b..43d6ecbef1a 100644 --- a/crates/fuel-core/src/query/message/test.rs +++ b/crates/fuel-core/src/query/message/test.rs @@ -10,8 +10,6 @@ use fuel_core_types::{ fuel_tx::{ AssetId, ContractId, - Script, - Transaction, }, fuel_types::BlockHeight, tai64::Tai64, @@ -64,7 +62,6 @@ mockall::mock! { message_block_height: &BlockHeight, commit_block_height: &BlockHeight, ) -> StorageResult; - fn transaction(&self, transaction_id: &TxId) -> StorageResult; fn receipts(&self, transaction_id: &TxId) -> StorageResult>; fn transaction_status(&self, transaction_id: &TxId) -> StorageResult; } @@ -107,16 +104,6 @@ async fn can_build_message_proof() { } }); - data.expect_transaction().returning(move |txn_id| { - let tx = TXNS - .iter() - .find(|t| *t == txn_id) - .map(|_| Script::default().into()) - .ok_or(not_found!("Transaction in `TXNS`"))?; - - Ok(tx) - }); - let commit_block_header = PartialBlockHeader { application: ApplicationHeader { da_height: 0u64.into(), diff --git a/crates/fuel-core/src/query/tx.rs b/crates/fuel-core/src/query/tx.rs index 6898e9155a4..0bceeef8809 100644 --- a/crates/fuel-core/src/query/tx.rs +++ b/crates/fuel-core/src/query/tx.rs @@ -3,6 +3,7 @@ use fuel_core_storage::{ iter::IterDirection, not_found, tables::Transactions, + Error as StorageError, Result as StorageResult, }; use fuel_core_types::{ @@ -18,11 +19,12 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; impl ReadView { pub fn receipts(&self, tx_id: &TxId) -> StorageResult> { - let status = self.status(tx_id)?; + let status = self.tx_status(tx_id)?; let receipts = match status { TransactionStatus::Success { receipts, .. } @@ -32,10 +34,6 @@ impl ReadView { receipts.ok_or(not_found!(Transactions)) } - pub fn status(&self, tx_id: &TxId) -> StorageResult { - self.tx_status(tx_id) - } - pub fn owned_transactions( &self, owner: Address, @@ -43,13 +41,22 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { self.owned_transactions_ids(owner, start, direction) - .map(|result| { - result.and_then(|(tx_pointer, tx_id)| { - // TODO: Fetch transactions in a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) - let tx = self.transaction(&tx_id)?; + .chunks(self.batch_size) + .map(|chunk| { + use itertools::Itertools; - Ok((tx_pointer, tx)) - }) + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| async move { + let tx_ids = chunk.iter().map(|(_, tx_id)| *tx_id).collect::>(); + let txs = self.transactions(tx_ids).await; + let txs = txs + .into_iter() + .zip(chunk) + .map(|(result, (tx_pointer, _))| result.map(|tx| (tx_pointer, tx))); + Ok(Some(futures::stream::iter(txs))) }) + .try_flatten() } } diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index 335eb66ed92..7cdfeff2d5b 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -44,12 +44,14 @@ use fuel_core_types::{ block::CompressedBlock, header::BlockHeader, }, + fuel_tx::TxId, fuel_types, fuel_types::BlockHeight, }; use futures::{ Stream, StreamExt, + TryStreamExt, }; pub struct Block(pub(crate) CompressedBlock); @@ -135,14 +137,27 @@ impl Block { ctx: &Context<'_>, ) -> async_graphql::Result> { let query = ctx.read_view()?; - self.0 - .transactions() - .iter() - .map(|tx_id| { - let tx = query.transaction(tx_id)?; - Ok(Transaction::from_tx(*tx_id, tx)) + let tx_ids = futures::stream::iter(self.0.transactions().iter().copied()); + + let result = tx_ids + .chunks(query.batch_size) + .filter_map(move |tx_ids: Vec| { + let async_query = query.as_ref().clone(); + async move { + let txs = async_query.transactions(tx_ids.clone()).await; + let txs = txs + .into_iter() + .zip(tx_ids.into_iter()) + .map(|(r, tx_id)| r.map(|tx| Transaction::from_tx(tx_id, tx))); + + Some(futures::stream::iter(txs)) + } }) - .collect() + .flatten() + .try_collect() + .await?; + + Ok(result) } } diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 9ea589771d7..8ee21edf972 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -71,7 +71,6 @@ use std::{ borrow::Cow, iter, }; -use tokio_stream::StreamExt; use types::{ DryRunTransactionExecutionStatus, Transaction, @@ -123,7 +122,9 @@ impl TxQuery { ) -> async_graphql::Result< Connection, > { + use futures::stream::StreamExt; let query = ctx.read_view()?; + let query_ref = query.as_ref(); crate::schema::query_pagination( after, before, @@ -156,16 +157,34 @@ impl TxQuery { false }; - async move { Ok(skip) } + async move { Ok::<_, StorageError>(skip) } }) - .map(|result: StorageResult| { - result.and_then(|sorted| { - // TODO: Request transactions in a separate thread - let tx = query.transaction(&sorted.tx_id.0)?; + .chunks(query_ref.batch_size) + .map(|chunk| { + use itertools::Itertools; - Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx))) - }) - }); + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| { + let async_query = query_ref.clone(); + async move { + let tx_ids = chunk + .iter() + .map(|sorted| sorted.tx_id.0) + .collect::>(); + let txs = async_query.transactions(tx_ids).await; + let txs = txs.into_iter().zip(chunk.into_iter()).map( + |(result, sorted)| { + result.map(|tx| { + (sorted, Transaction::from_tx(sorted.tx_id.0, tx)) + }) + }, + ); + Ok(Some(futures::stream::iter(txs))) + } + }) + .try_flatten(); Ok(all_txs) }, @@ -188,6 +207,7 @@ impl TxQuery { before: Option, ) -> async_graphql::Result> { + use futures::stream::StreamExt; let query = ctx.read_view()?; let params = ctx .data_unchecked::() @@ -382,6 +402,7 @@ impl TxStatusSubscription { ) -> async_graphql::Result< impl Stream> + 'a, > { + use tokio_stream::StreamExt; let subscription = submit_and_await_status(ctx, tx).await?; Ok(subscription @@ -410,6 +431,7 @@ async fn submit_and_await_status<'a>( ) -> async_graphql::Result< impl Stream> + 'a, > { + use tokio_stream::StreamExt; let txpool = ctx.data_unchecked::(); let params = ctx .data_unchecked::() diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index d237bb817ae..effbc463d0c 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -987,7 +987,7 @@ pub(crate) async fn get_tx_status( txpool: &TxPool, ) -> Result, StorageError> { match query - .status(&id) + .tx_status(&id) .into_api_result::()? { Some(status) => { diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index f3f3d1ca614..1e242c75e03 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -139,6 +139,7 @@ impl Config { 0, ), number_of_threads: 0, + database_batch_size: 100, max_queries_depth: 16, max_queries_complexity: 80000, max_queries_recursive_depth: 16, diff --git a/crates/services/Cargo.toml b/crates/services/Cargo.toml index 345bcd64287..34d0726cdca 100644 --- a/crates/services/Cargo.toml +++ b/crates/services/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } fuel-core-metrics = { workspace = true } futures = { workspace = true } parking_lot = { workspace = true } +pin-project-lite = { workspace = true } rayon = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index 10da0bc75f7..e7fa438631f 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -11,6 +11,7 @@ mod state; mod sync; #[cfg(feature = "sync-processor")] mod sync_processor; +pub mod yield_stream; /// Re-exports for streaming utilities pub mod stream { diff --git a/crates/services/src/yield_stream.rs b/crates/services/src/yield_stream.rs new file mode 100644 index 00000000000..60331dbf119 --- /dev/null +++ b/crates/services/src/yield_stream.rs @@ -0,0 +1,164 @@ +//! Stream that yields each `batch_size` items allowing other tasks to work. + +use futures::{ + ready, + stream::Fuse, + Stream, + StreamExt, +}; +use std::{ + pin::Pin, + task::{ + Context, + Poll, + }, +}; + +pin_project_lite::pin_project! { + /// Stream that yields each `batch_size` items. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct YieldStream { + #[pin] + stream: Fuse, + item: Option, + counter: usize, + batch_size: usize, + } +} + +impl YieldStream { + /// Create a new `YieldStream` with the given `batch_size`. + pub fn new(stream: St, batch_size: usize) -> Self { + assert!(batch_size > 0); + + Self { + stream: stream.fuse(), + item: None, + counter: 0, + batch_size, + } + } +} + +impl Stream for YieldStream { + type Item = St::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.as_mut().project(); + + // If we have a cached item, return it because that means we were woken up. + if let Some(item) = this.item.take() { + *this.counter = 1; + return Poll::Ready(Some(item)); + } + + match ready!(this.stream.as_mut().poll_next(cx)) { + // Return items, unless we reached the batch size. + // after that, we want to yield before returning the next item. + Some(item) => { + if this.counter < this.batch_size { + *this.counter = this.counter.saturating_add(1); + + Poll::Ready(Some(item)) + } else { + *this.item = Some(item); + + cx.waker().wake_by_ref(); + + Poll::Pending + } + } + + // Underlying stream ran out of values, so finish this stream as well. + None => Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + let cached_len = usize::from(self.item.is_some()); + let (lower, upper) = self.stream.size_hint(); + let lower = lower.saturating_add(cached_len); + let upper = match upper { + Some(x) => x.checked_add(cached_len), + None => None, + }; + (lower, upper) + } +} + +/// Extension trait for `Stream`. +pub trait StreamYieldExt: Stream { + /// Yields each `batch_size` items allowing other tasks to work. + fn yield_each(self, batch_size: usize) -> YieldStream + where + Self: Sized, + { + YieldStream::new(self, batch_size) + } +} + +impl StreamYieldExt for St where St: Stream {} + +#[cfg(test)] +#[allow(non_snake_case)] +mod tests { + use super::*; + + #[tokio::test] + async fn yield_stream__works_with_10_elements_loop() { + let stream = futures::stream::iter(0..10); + let mut yield_stream = YieldStream::new(stream, 3); + + let mut items = Vec::new(); + while let Some(item) = yield_stream.next().await { + items.push(item); + } + + assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[tokio::test] + async fn yield_stream__works_with_10_elements__collect() { + let stream = futures::stream::iter(0..10); + let yield_stream = stream.yield_each(3); + + let items = yield_stream.collect::>().await; + + assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + } + + #[tokio::test] + async fn yield_stream__passed_control_to_another_future() { + let stream = futures::stream::iter(0..10); + let mut yield_stream = YieldStream::new(stream, 3); + + async fn second_future() -> i32 { + -1 + } + + let mut items = Vec::new(); + loop { + tokio::select! { + biased; + + item = yield_stream.next() => { + if let Some(item) = item { + items.push(item); + } else { + break; + } + } + + item = second_future() => { + items.push(item); + } + } + } + + assert_eq!(items, vec![0, 1, 2, -1, 3, 4, 5, -1, 6, 7, 8, -1, 9]); + } +} diff --git a/tests/test-helpers/src/builder.rs b/tests/test-helpers/src/builder.rs index 91134ceb7c3..0d16bc48dc6 100644 --- a/tests/test-helpers/src/builder.rs +++ b/tests/test-helpers/src/builder.rs @@ -93,6 +93,7 @@ pub struct TestSetupBuilder { pub initial_coins: Vec, pub starting_gas_price: u64, pub gas_limit: Option, + pub block_size_limit: Option, pub starting_block: Option, pub utxo_validation: bool, pub privileged_address: Address, @@ -201,6 +202,13 @@ impl TestSetupBuilder { .set_block_gas_limit(gas_limit); } + if let Some(block_size_limit) = self.block_size_limit { + chain_conf + .consensus_parameters + .set_block_transaction_size_limit(block_size_limit) + .expect("Should set new block size limit"); + } + chain_conf .consensus_parameters .set_privileged_address(self.privileged_address); @@ -251,6 +259,7 @@ impl Default for TestSetupBuilder { initial_coins: vec![], starting_gas_price: 0, gas_limit: None, + block_size_limit: None, starting_block: None, utxo_validation: true, privileged_address: Default::default(), diff --git a/tests/tests/dos.rs b/tests/tests/dos.rs index ca4295024fc..05c0e0fc030 100644 --- a/tests/tests/dos.rs +++ b/tests/tests/dos.rs @@ -1,6 +1,9 @@ #![allow(non_snake_case)] -use std::time::Instant; +use std::time::{ + Duration, + Instant, +}; use fuel_core::service::{ Config, @@ -681,3 +684,40 @@ async fn schema_is_retrievable() { let result = send_graph_ql_query(&url, query).await; assert!(result.contains("__schema"), "{:?}", result); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn heavy_tasks_doesnt_block_graphql() { + let mut config = Config::local_node(); + + const NUM_OF_BLOCKS: u32 = 4000; + config.graphql_config.max_queries_complexity = 10_000_000; + + let query = FULL_BLOCK_QUERY.to_string(); + let query = query.replace("$NUMBER_OF_BLOCKS", NUM_OF_BLOCKS.to_string().as_str()); + + let node = FuelService::new_node(config).await.unwrap(); + let url = format!("http://{}/v1/graphql", node.bound_address); + let client = FuelClient::new(url.clone()).unwrap(); + client.produce_blocks(NUM_OF_BLOCKS, None).await.unwrap(); + + // Given + for _ in 0..50 { + let url = url.clone(); + let query = query.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(20)).await; + let result = send_graph_ql_query(&url, &query).await; + assert!(result.contains("transactions")); + }); + } + // Wait for all queries to start be processed on the node. + tokio::time::sleep(Duration::from_secs(1)).await; + + // When + let result = tokio::time::timeout(Duration::from_secs(5), client.health()).await; + + // Then + let result = result.expect("Health check timed out"); + let health = result.expect("Health check failed"); + assert!(health); +}