Skip to content

Commit

Permalink
Avoid long heavy tasks in the GraphQL service (#2340)
Browse files Browse the repository at this point in the history
This PR adds chunking to the stream in GraphQL and requests data in
batches.

The current implementation is simple and executes batches in the same
runtime. But at the end of batch fetching, it yields, allowing other
tasks to be processed.

## Checklist
- [x] New behavior is reflected in tests

### Before requesting review
- [x] I have reviewed the code myself

---------

Co-authored-by: Mårten Blankfors <marten@blankfors.se>
  • Loading branch information
xgreenx and netrome authored Oct 14, 2024
1 parent 0a378bf commit 87c9579
Show file tree
Hide file tree
Showing 25 changed files with 430 additions and 112 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods.
- [2341](https://github.com/FuelLabs/fuel-core/pull/2341): Updated all pagination queries to work with the async stream instead of the sync iterator.
- [2340](https://github.com/FuelLabs/fuel-core/pull/2340): Avoid long heavy tasks in the GraphQL service by splitting work into batches.
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Limited the number of threads used by the GraphQL service.

#### Breaking
Expand Down
1 change: 1 addition & 0 deletions benches/benches/transaction_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ where
test_builder.trigger = Trigger::Never;
test_builder.utxo_validation = true;
test_builder.gas_limit = Some(10_000_000_000);
test_builder.block_size_limit = Some(1_000_000_000_000);

// spin up node
let transactions: Vec<Transaction> =
Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl Command {
graphql_config: GraphQLConfig {
addr,
number_of_threads: graphql.graphql_number_of_threads,
database_batch_size: graphql.database_batch_size,
max_queries_depth: graphql.graphql_max_depth,
max_queries_complexity: graphql.graphql_max_complexity,
max_queries_recursive_depth: graphql.graphql_max_recursive_depth,
Expand Down
4 changes: 4 additions & 0 deletions bin/fuel-core/src/cli/run/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ pub struct GraphQLArgs {
#[clap(long = "graphql-number-of-threads", default_value = "2", env)]
pub graphql_number_of_threads: usize,

/// The size of the batch fetched from the database by GraphQL service.
#[clap(long = "graphql-database-batch-size", default_value = "100", env)]
pub database_batch_size: usize,

/// The max depth of GraphQL queries.
#[clap(long = "graphql-max-depth", default_value = "16", env)]
pub graphql_max_depth: usize,
Expand Down
13 changes: 4 additions & 9 deletions crates/fuel-core/src/coins_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,7 @@ mod tests {
fuel_asm::Word,
fuel_tx::*,
};
use futures::{
StreamExt,
TryStreamExt,
};
use futures::TryStreamExt;
use itertools::Itertools;
use rand::{
rngs::StdRng,
Expand Down Expand Up @@ -988,7 +985,7 @@ mod tests {
fn service_database(&self) -> ServiceDatabase {
let on_chain = self.database.on_chain().clone();
let off_chain = self.database.off_chain().clone();
ServiceDatabase::new(0u32.into(), on_chain, off_chain)
ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain)
}
}

Expand Down Expand Up @@ -1045,8 +1042,7 @@ mod tests {
let query = self.service_database();
let query = query.test_view();
query
.owned_coins_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.coin(id).unwrap()))
.owned_coins(owner, None, IterDirection::Forward)
.try_collect()
.await
.unwrap()
Expand All @@ -1056,8 +1052,7 @@ mod tests {
let query = self.service_database();
let query = query.test_view();
query
.owned_message_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.message(&id).unwrap()))
.owned_messages(owner, None, IterDirection::Forward)
.try_collect()
.await
.unwrap()
Expand Down
3 changes: 2 additions & 1 deletion crates/fuel-core/src/database/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ impl OnChainIterableKeyValueView {
let db_block = self.storage::<FuelBlocks>().get(height)?;
if let Some(block) = db_block {
// fetch all the transactions
// TODO: optimize with multi-key get
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let txs = block
.transactions()
.iter()
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct Config {
pub struct ServiceConfig {
pub addr: SocketAddr,
pub number_of_threads: usize,
pub database_batch_size: usize,
pub max_queries_depth: usize,
pub max_queries_complexity: usize,
pub max_queries_recursive_depth: usize,
Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ where
graphql_api::initialize_query_costs(config.config.costs.clone())?;

let network_addr = config.config.addr;
let combined_read_database =
ReadDatabase::new(genesis_block_height, on_database, off_database);
let combined_read_database = ReadDatabase::new(
config.config.database_batch_size,
genesis_block_height,
on_database,
off_database,
);
let request_timeout = config.config.api_request_timeout;
let concurrency_limit = config.config.max_concurrent_queries;
let body_limit = config.config.request_body_bytes_limit;
Expand Down
57 changes: 29 additions & 28 deletions crates/fuel-core/src/graphql_api/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::fuel_core_graphql_api::{
OnChainDatabase,
},
};
use fuel_core_services::yield_stream::StreamYieldExt;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -77,6 +78,8 @@ pub type OffChainView = Arc<dyn OffChainDatabase>;
/// The container of the on-chain and off-chain database view provides.
/// It is used only by `ViewExtension` to create a [`ReadView`].
pub struct ReadDatabase {
/// The size of the batch during fetching from the database.
batch_size: usize,
/// The height of the genesis block.
genesis_height: BlockHeight,
/// The on-chain database view provider.
Expand All @@ -88,6 +91,7 @@ pub struct ReadDatabase {
impl ReadDatabase {
/// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers.
pub fn new<OnChain, OffChain>(
batch_size: usize,
genesis_height: BlockHeight,
on_chain: OnChain,
off_chain: OffChain,
Expand All @@ -99,6 +103,7 @@ impl ReadDatabase {
OffChain::LatestView: OffChainDatabase,
{
Self {
batch_size,
genesis_height,
on_chain: Box::new(ArcWrapper::new(on_chain)),
off_chain: Box::new(ArcWrapper::new(off_chain)),
Expand All @@ -111,6 +116,7 @@ impl ReadDatabase {
// It is not possible to implement until `view_at` is implemented for the `AtomicView`.
// https://github.com/FuelLabs/fuel-core/issues/1582
Ok(ReadView {
batch_size: self.batch_size,
genesis_height: self.genesis_height,
on_chain: self.on_chain.latest_view()?,
off_chain: self.off_chain.latest_view()?,
Expand All @@ -125,6 +131,7 @@ impl ReadDatabase {

#[derive(Clone)]
pub struct ReadView {
pub(crate) batch_size: usize,
pub(crate) genesis_height: BlockHeight,
pub(crate) on_chain: OnChainView,
pub(crate) off_chain: OffChainView,
Expand All @@ -134,7 +141,7 @@ impl ReadView {
pub fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction> {
let result = self.on_chain.transaction(tx_id);
if result.is_not_found() {
if let Some(tx) = self.old_transaction(tx_id)? {
if let Some(tx) = self.off_chain.old_transaction(tx_id)? {
Ok(tx)
} else {
Err(not_found!(Transactions))
Expand All @@ -144,6 +151,21 @@ impl ReadView {
}
}

pub async fn transactions(
&self,
tx_ids: Vec<TxId>,
) -> Vec<StorageResult<Transaction>> {
// TODO: Use multiget when it's implemented.
// https://github.com/FuelLabs/fuel-core/issues/2344
let result = tx_ids
.iter()
.map(|tx_id| self.transaction(tx_id))
.collect::<Vec<_>>();
// Give a chance to other tasks to run.
tokio::task::yield_now().await;
result
}

pub fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
if *height >= self.genesis_height {
self.on_chain.block(height)
Expand Down Expand Up @@ -252,6 +274,7 @@ impl ReadView {
direction: IterDirection,
) -> impl Stream<Item = StorageResult<Message>> + '_ {
futures::stream::iter(self.on_chain.all_messages(start_message_id, direction))
.yield_each(self.batch_size)
}

pub fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool> {
Expand All @@ -276,6 +299,7 @@ impl ReadView {
start_asset,
direction,
))
.yield_each(self.batch_size)
}

pub fn da_height(&self) -> StorageResult<DaBlockHeight> {
Expand Down Expand Up @@ -316,12 +340,12 @@ impl ReadView {
futures::stream::iter(iter)
}

pub fn owned_message_ids<'a>(
&'a self,
owner: &'a Address,
pub fn owned_message_ids(
&self,
owner: &Address,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<Nonce>> + 'a {
) -> impl Stream<Item = StorageResult<Nonce>> + '_ {
futures::stream::iter(self.off_chain.owned_message_ids(
owner,
start_message_id,
Expand All @@ -345,29 +369,6 @@ impl ReadView {
self.off_chain.contract_salt(contract_id)
}

pub fn old_block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
self.off_chain.old_block(height)
}

pub fn old_blocks(
&self,
height: Option<BlockHeight>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<CompressedBlock>> {
self.off_chain.old_blocks(height, direction)
}

pub fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult<Consensus> {
self.off_chain.old_block_consensus(height)
}

pub fn old_transaction(
&self,
id: &TxId,
) -> StorageResult<Option<fuel_core_types::fuel_tx::Transaction>> {
self.off_chain.old_transaction(id)
}

pub fn relayed_tx_status(
&self,
id: Bytes32,
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/src/query/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use asset_query::{
AssetSpendTarget,
AssetsQuery,
};
use fuel_core_services::yield_stream::StreamYieldExt;
use fuel_core_storage::{
iter::IterDirection,
Result as StorageResult,
Expand Down Expand Up @@ -106,5 +107,6 @@ impl ReadView {
})
.map_ok(|stream| stream.map(Ok))
.try_flatten()
.yield_each(self.batch_size)
}
}
50 changes: 40 additions & 10 deletions crates/fuel-core/src/query/balance/asset_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use fuel_core_types::{
AssetId,
},
};
use futures::Stream;
use futures::{
Stream,
TryStreamExt,
};
use std::collections::HashSet;
use tokio_stream::StreamExt;

Expand Down Expand Up @@ -78,7 +81,9 @@ impl<'a> AssetsQuery<'a> {

fn coins_iter(mut self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
let allowed_assets = self.allowed_assets.take();
self.database
let database = self.database;
let stream = self
.database
.owned_coins_ids(self.owner, None, IterDirection::Forward)
.map(|id| id.map(CoinId::from))
.filter(move |result| {
Expand All @@ -99,12 +104,25 @@ impl<'a> AssetsQuery<'a> {
} else {
return Err(anyhow::anyhow!("The coin is not UTXO").into());
};
// TODO: Fetch coin in a separate thread
let coin = self.database.coin(id)?;

Ok(CoinType::Coin(coin))
Ok(id)
})
});

futures::stream::StreamExt::chunks(stream, database.batch_size)
.map(|chunk| {
use itertools::Itertools;

let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
Ok::<_, StorageError>(chunk)
})
.try_filter_map(move |chunk| async move {
let chunk = database
.coins(chunk)
.await
.map(|result| result.map(CoinType::Coin));
Ok(Some(futures::stream::iter(chunk)))
})
.try_flatten()
.filter(move |result| {
if let Ok(CoinType::Coin(coin)) = result {
allowed_asset(&allowed_assets, &coin.asset_id)
Expand All @@ -117,7 +135,8 @@ impl<'a> AssetsQuery<'a> {
fn messages_iter(&self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
let exclude = self.exclude;
let database = self.database;
self.database
let stream = self
.database
.owned_message_ids(self.owner, None, IterDirection::Forward)
.map(|id| id.map(CoinId::from))
.filter(move |result| {
Expand All @@ -138,11 +157,22 @@ impl<'a> AssetsQuery<'a> {
} else {
return Err(anyhow::anyhow!("The coin is not a message").into());
};
// TODO: Fetch message in a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340)
let message = database.message(&id)?;
Ok(message)
Ok(id)
})
});

futures::stream::StreamExt::chunks(stream, database.batch_size)
.map(|chunk| {
use itertools::Itertools;

let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
Ok(chunk)
})
.try_filter_map(move |chunk| async move {
let chunk = database.messages(chunk).await;
Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
})
.try_flatten()
.filter(|result| {
if let Ok(message) = result {
message.data().is_empty()
Expand Down
3 changes: 2 additions & 1 deletion crates/fuel-core/src/query/block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::fuel_core_graphql_api::database::ReadView;
use fuel_core_services::yield_stream::StreamYieldExt;
use fuel_core_storage::{
iter::IterDirection,
Result as StorageResult,
Expand All @@ -23,6 +24,6 @@ impl ReadView {
height: Option<BlockHeight>,
direction: IterDirection,
) -> impl Stream<Item = StorageResult<CompressedBlock>> + '_ {
futures::stream::iter(self.blocks(height, direction))
futures::stream::iter(self.blocks(height, direction)).yield_each(self.batch_size)
}
}
Loading

0 comments on commit 87c9579

Please sign in to comment.