-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid long heavy tasks in the GraphQL service #2340
Changes from all commits
2fe7bb6
ab5e940
1782684
5192c95
feeb816
4237feb
142cd1d
671e80e
2354d10
d982fbc
c355cd1
7846db4
5a752bc
a8c1942
df80801
e414046
3d1e2d8
8cb24b0
74bf964
3868d45
678e0be
314cd37
727b2da
d5d7934
73adec0
5e0ec92
981b690
3521a72
946f46c
c4ad916
2866da1
d8e159c
c61436e
71d3692
35c7184
d8c8c5e
866f538
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ use crate::fuel_core_graphql_api::{ | |
OnChainDatabase, | ||
}, | ||
}; | ||
use fuel_core_services::yield_stream::StreamYieldExt; | ||
use fuel_core_storage::{ | ||
iter::{ | ||
BoxedIter, | ||
|
@@ -77,6 +78,8 @@ pub type OffChainView = Arc<dyn OffChainDatabase>; | |
/// The container of the on-chain and off-chain database view provides. | ||
/// It is used only by `ViewExtension` to create a [`ReadView`]. | ||
pub struct ReadDatabase { | ||
/// The size of the batch during fetching from the database. | ||
batch_size: usize, | ||
/// The height of the genesis block. | ||
genesis_height: BlockHeight, | ||
/// The on-chain database view provider. | ||
|
@@ -88,6 +91,7 @@ pub struct ReadDatabase { | |
impl ReadDatabase { | ||
/// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers. | ||
pub fn new<OnChain, OffChain>( | ||
batch_size: usize, | ||
genesis_height: BlockHeight, | ||
on_chain: OnChain, | ||
off_chain: OffChain, | ||
|
@@ -99,6 +103,7 @@ impl ReadDatabase { | |
OffChain::LatestView: OffChainDatabase, | ||
{ | ||
Self { | ||
batch_size, | ||
genesis_height, | ||
on_chain: Box::new(ArcWrapper::new(on_chain)), | ||
off_chain: Box::new(ArcWrapper::new(off_chain)), | ||
|
@@ -111,6 +116,7 @@ impl ReadDatabase { | |
// It is not possible to implement until `view_at` is implemented for the `AtomicView`. | ||
// https://github.com/FuelLabs/fuel-core/issues/1582 | ||
Ok(ReadView { | ||
batch_size: self.batch_size, | ||
genesis_height: self.genesis_height, | ||
on_chain: self.on_chain.latest_view()?, | ||
off_chain: self.off_chain.latest_view()?, | ||
|
@@ -125,6 +131,7 @@ impl ReadDatabase { | |
|
||
#[derive(Clone)] | ||
pub struct ReadView { | ||
pub(crate) batch_size: usize, | ||
pub(crate) genesis_height: BlockHeight, | ||
pub(crate) on_chain: OnChainView, | ||
pub(crate) off_chain: OffChainView, | ||
|
@@ -134,7 +141,7 @@ impl ReadView { | |
pub fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction> { | ||
let result = self.on_chain.transaction(tx_id); | ||
if result.is_not_found() { | ||
if let Some(tx) = self.old_transaction(tx_id)? { | ||
if let Some(tx) = self.off_chain.old_transaction(tx_id)? { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer the old helper... although the meaning of "old" here isn't clear to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
Ok(tx) | ||
} else { | ||
Err(not_found!(Transactions)) | ||
|
@@ -144,6 +151,21 @@ impl ReadView { | |
} | ||
} | ||
|
||
pub async fn transactions( | ||
&self, | ||
tx_ids: Vec<TxId>, | ||
) -> Vec<StorageResult<Transaction>> { | ||
// TODO: Use multiget when it's implemented. | ||
xgreenx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// https://github.com/FuelLabs/fuel-core/issues/2344 | ||
let result = tx_ids | ||
.iter() | ||
.map(|tx_id| self.transaction(tx_id)) | ||
.collect::<Vec<_>>(); | ||
// Give a chance to other tasks to run. | ||
tokio::task::yield_now().await; | ||
netrome marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the point of this yield? This task has already finished using the database by now. Should we be applying the same chunk batching to the iterator above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea is that in the future, we will have an async caching mechanism where we will wait for notification when the cache fetches a new desired value(which potentially can be used by several queries in the last block with transactions). This |
||
result | ||
} | ||
|
||
pub fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> { | ||
if *height >= self.genesis_height { | ||
self.on_chain.block(height) | ||
|
@@ -252,6 +274,7 @@ impl ReadView { | |
direction: IterDirection, | ||
) -> impl Stream<Item = StorageResult<Message>> + '_ { | ||
futures::stream::iter(self.on_chain.all_messages(start_message_id, direction)) | ||
.yield_each(self.batch_size) | ||
} | ||
|
||
pub fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool> { | ||
|
@@ -276,6 +299,7 @@ impl ReadView { | |
start_asset, | ||
direction, | ||
)) | ||
.yield_each(self.batch_size) | ||
} | ||
|
||
pub fn da_height(&self) -> StorageResult<DaBlockHeight> { | ||
|
@@ -316,12 +340,12 @@ impl ReadView { | |
futures::stream::iter(iter) | ||
} | ||
|
||
pub fn owned_message_ids<'a>( | ||
&'a self, | ||
owner: &'a Address, | ||
pub fn owned_message_ids( | ||
&self, | ||
owner: &Address, | ||
start_message_id: Option<Nonce>, | ||
direction: IterDirection, | ||
) -> impl Stream<Item = StorageResult<Nonce>> + 'a { | ||
) -> impl Stream<Item = StorageResult<Nonce>> + '_ { | ||
futures::stream::iter(self.off_chain.owned_message_ids( | ||
owner, | ||
start_message_id, | ||
|
@@ -345,29 +369,6 @@ impl ReadView { | |
self.off_chain.contract_salt(contract_id) | ||
} | ||
|
||
pub fn old_block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> { | ||
self.off_chain.old_block(height) | ||
} | ||
|
||
pub fn old_blocks( | ||
&self, | ||
height: Option<BlockHeight>, | ||
direction: IterDirection, | ||
) -> BoxedIter<'_, StorageResult<CompressedBlock>> { | ||
self.off_chain.old_blocks(height, direction) | ||
} | ||
|
||
pub fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult<Consensus> { | ||
self.off_chain.old_block_consensus(height) | ||
} | ||
|
||
pub fn old_transaction( | ||
&self, | ||
id: &TxId, | ||
) -> StorageResult<Option<fuel_core_types::fuel_tx::Transaction>> { | ||
self.off_chain.old_transaction(id) | ||
} | ||
|
||
pub fn relayed_tx_status( | ||
&self, | ||
id: Bytes32, | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,7 +15,10 @@ use fuel_core_types::{ | |||||
AssetId, | ||||||
}, | ||||||
}; | ||||||
use futures::Stream; | ||||||
use futures::{ | ||||||
Stream, | ||||||
TryStreamExt, | ||||||
}; | ||||||
use std::collections::HashSet; | ||||||
use tokio_stream::StreamExt; | ||||||
|
||||||
|
@@ -78,7 +81,9 @@ impl<'a> AssetsQuery<'a> { | |||||
|
||||||
fn coins_iter(mut self) -> impl Stream<Item = StorageResult<CoinType>> + 'a { | ||||||
let allowed_assets = self.allowed_assets.take(); | ||||||
self.database | ||||||
let database = self.database; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I don't see the need for this assignment
Suggested change
|
||||||
let stream = self | ||||||
.database | ||||||
.owned_coins_ids(self.owner, None, IterDirection::Forward) | ||||||
.map(|id| id.map(CoinId::from)) | ||||||
.filter(move |result| { | ||||||
|
@@ -99,12 +104,25 @@ impl<'a> AssetsQuery<'a> { | |||||
} else { | ||||||
return Err(anyhow::anyhow!("The coin is not UTXO").into()); | ||||||
}; | ||||||
// TODO: Fetch coin in a separate thread | ||||||
let coin = self.database.coin(id)?; | ||||||
|
||||||
Ok(CoinType::Coin(coin)) | ||||||
Ok(id) | ||||||
}) | ||||||
}); | ||||||
|
||||||
futures::stream::StreamExt::chunks(stream, database.batch_size) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could just refer to
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or add a getter for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could this all be cleaned up with the new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, here we actually send a batch request to the database like: "Please fetch the all coins for these list of UtxoIds". Later we can optimize it with multi get + caching(next follow up PR will add caching) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oic, since it might be multiget later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep |
||||||
.map(|chunk| { | ||||||
use itertools::Itertools; | ||||||
|
||||||
let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; | ||||||
Ok::<_, StorageError>(chunk) | ||||||
}) | ||||||
.try_filter_map(move |chunk| async move { | ||||||
let chunk = database | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And here
Suggested change
|
||||||
.coins(chunk) | ||||||
.await | ||||||
.map(|result| result.map(CoinType::Coin)); | ||||||
Ok(Some(futures::stream::iter(chunk))) | ||||||
}) | ||||||
.try_flatten() | ||||||
.filter(move |result| { | ||||||
if let Ok(CoinType::Coin(coin)) = result { | ||||||
allowed_asset(&allowed_assets, &coin.asset_id) | ||||||
|
@@ -117,7 +135,8 @@ impl<'a> AssetsQuery<'a> { | |||||
fn messages_iter(&self) -> impl Stream<Item = StorageResult<CoinType>> + 'a { | ||||||
let exclude = self.exclude; | ||||||
let database = self.database; | ||||||
self.database | ||||||
let stream = self | ||||||
.database | ||||||
.owned_message_ids(self.owner, None, IterDirection::Forward) | ||||||
.map(|id| id.map(CoinId::from)) | ||||||
.filter(move |result| { | ||||||
|
@@ -138,11 +157,22 @@ impl<'a> AssetsQuery<'a> { | |||||
} else { | ||||||
return Err(anyhow::anyhow!("The coin is not a message").into()); | ||||||
}; | ||||||
// TODO: Fetch message in a separate thread (https://github.com/FuelLabs/fuel-core/pull/2340) | ||||||
let message = database.message(&id)?; | ||||||
Ok(message) | ||||||
Ok(id) | ||||||
}) | ||||||
}); | ||||||
|
||||||
futures::stream::StreamExt::chunks(stream, database.batch_size) | ||||||
.map(|chunk| { | ||||||
use itertools::Itertools; | ||||||
|
||||||
let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; | ||||||
Ok(chunk) | ||||||
}) | ||||||
.try_filter_map(move |chunk| async move { | ||||||
let chunk = database.messages(chunk).await; | ||||||
Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) | ||||||
}) | ||||||
.try_flatten() | ||||||
.filter(|result| { | ||||||
if let Ok(message) = result { | ||||||
message.data().is_empty() | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this have to be configurable? can we just set it to a global now to reduce the config surface? we have too many options rn :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to make it configurable for now to be able to adjust our performance on the fly. Later we can remove it when we have better ideas about the best value