From 94020681c8710e2c34540fe00c7e6ec5ba8495cc Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Fri, 20 Oct 2023 01:29:01 +0200 Subject: [PATCH] Tx subscription cleanup (#1422) (#1432) Applying hotfix https://github.com/FuelLabs/fuel-core/pull/1422 to `master` branch. Closes https://github.com/FuelLabs/fuel-core/issues/1421 --------- Co-authored-by: Brandon Kite --- CHANGELOG.md | 2 + Cargo.lock | 1 + bin/fuel-core/src/cli/run.rs | 6 ++ crates/fuel-core/Cargo.toml | 2 +- crates/fuel-core/src/graphql_api/ports.rs | 7 +-- crates/fuel-core/src/graphql_api/service.rs | 10 +++- crates/fuel-core/src/query/subscriptions.rs | 2 +- .../fuel-core/src/query/subscriptions/test.rs | 1 - crates/fuel-core/src/schema/tx.rs | 12 ++-- .../src/service/adapters/graphql_api.rs | 12 ++-- crates/fuel-core/src/service/config.rs | 2 + crates/fuel-core/src/service/query.rs | 16 +++--- crates/fuel-core/src/service/sub_services.rs | 1 + crates/services/txpool/src/service.rs | 25 ++++++--- crates/services/txpool/src/service/tests.rs | 4 +- .../services/txpool/src/service/tests_p2p.rs | 6 +- .../txpool/src/service/update_sender.rs | 55 ++++++++----------- .../service/update_sender/tests/test_e2e.rs | 2 +- .../update_sender/tests/test_permits.rs | 2 + .../update_sender/tests/test_sending.rs | 1 + .../src/service/update_sender/tests/utils.rs | 3 + 21 files changed, 95 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 620e23873ed..8afb800d472 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1432](https://github.com/FuelLabs/fuel-core/pull/1432): Add a new `--api-request-timeout` argument to control TTL for GraphQL requests. - [#1419](https://github.com/FuelLabs/fuel-core/pull/1419): Add additional "sanity" benchmarks for arithmetic op code instructions. - [#1411](https://github.com/FuelLabs/fuel-core/pull/1411): Added WASM and `no_std` compatibility - [#1371](https://github.com/FuelLabs/fuel-core/pull/1371): Add new client function for querying the `MessageStatus` for a specific message (by `Nonce`) @@ -50,6 +51,7 @@ Description of the upcoming release here. - [#1395](https://github.com/FuelLabs/fuel-core/pull/1395): Add DependentCost benchmarks for `k256`, `s256` and `mcpi` instructions. #### Breaking +- [#1432](https://github.com/FuelLabs/fuel-core/pull/1432): All subscriptions and requests have a TTL now. So each subscription lifecycle is limited in time. If the subscription is closed because of TTL, it means that you subscribed after your transaction had been dropped by the network. - [#1407](https://github.com/FuelLabs/fuel-core/pull/1407): The recipient is a `ContractId` instead of `Address`. The block producer should deploy its contract to receive the transaction fee. The collected fee is zero until the recipient contract is set. - [#1407](https://github.com/FuelLabs/fuel-core/pull/1407): The `Mint` transaction is reworked with new fields to support the account-base model. It affects serialization and deserialization of the transaction and also affects GraphQL schema. - [#1407](https://github.com/FuelLabs/fuel-core/pull/1407): The `Mint` transaction is the last transaction in the block instead of the first. diff --git a/Cargo.lock b/Cargo.lock index 4be3e2622bc..700302fe7aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8153,6 +8153,7 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite 0.2.13", + "tokio", "tower", "tower-layer", "tower-service", diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 1e48901c967..701b502c744 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -202,6 +202,10 @@ pub struct Command { #[clap(long = "query-log-threshold-time", default_value = "2s", env)] pub query_log_threshold_time: humantime::Duration, + /// Timeout before drop the request. + #[clap(long = "api-request-timeout", default_value = "30m", env)] + pub api_request_timeout: humantime::Duration, + #[clap(flatten)] pub profiling: profiling::ProfilingArgs, } @@ -240,6 +244,7 @@ impl Command { min_connected_reserved_peers, time_until_synced, query_log_threshold_time, + api_request_timeout, profiling: _, } = self; @@ -297,6 +302,7 @@ impl Command { let config = Config { addr, + api_request_timeout: api_request_timeout.into(), max_database_cache_size, database_path, database_type, diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 10aab388a4c..01b7b7de5b3 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -57,7 +57,7 @@ tempfile = { workspace = true, optional = true } thiserror = "1.0" tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream = { workspace = true, features = ["sync"] } -tower-http = { version = "0.3", features = ["set-header", "trace"] } +tower-http = { version = "0.3", features = ["set-header", "trace", "timeout"] } tracing = { workspace = true } uuid = { version = "1.1", features = ["v4"] } diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index d270e424843..8edfcb42f35 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -1,8 +1,5 @@ use async_trait::async_trait; -use fuel_core_services::stream::{ - BoxFuture, - BoxStream, -}; +use fuel_core_services::stream::BoxStream; use fuel_core_storage::{ iter::{ BoxedIter, @@ -172,7 +169,7 @@ pub trait TxPoolPort: Send + Sync { fn tx_update_subscribe( &self, tx_id: TxId, - ) -> BoxFuture<'_, BoxStream>; + ) -> anyhow::Result>; } #[async_trait] diff --git a/crates/fuel-core/src/graphql_api/service.rs b/crates/fuel-core/src/graphql_api/service.rs index 6900a7b6d80..2ecb2c1457f 100644 --- a/crates/fuel-core/src/graphql_api/service.rs +++ b/crates/fuel-core/src/graphql_api/service.rs @@ -68,6 +68,7 @@ use std::{ use tokio_stream::StreamExt; use tower_http::{ set_header::SetResponseHeaderLayer, + timeout::TimeoutLayer, trace::TraceLayer, }; @@ -155,7 +156,8 @@ impl RunnableTask for Task { } } -// Need a separate Data Object for each Query endpoint, cannot be avoided +// Need a seperate Data Object for each Query endpoint, cannot be avoided +#[allow(clippy::too_many_arguments)] pub fn new_service( config: Config, schema: CoreSchemaBuilder, @@ -163,7 +165,8 @@ pub fn new_service( txpool: TxPool, producer: BlockProducer, consensus_module: ConsensusModule, - _log_threshold_ms: Duration, + log_threshold_ms: Duration, + request_timeout: Duration, ) -> anyhow::Result { let network_addr = config.addr; @@ -174,7 +177,7 @@ pub fn new_service( .data(producer) .data(consensus_module) .extension(async_graphql::extensions::Tracing) - .extension(MetricsExtension::new(_log_threshold_ms)) + .extension(MetricsExtension::new(log_threshold_ms)) .finish(); let router = Router::new() @@ -188,6 +191,7 @@ pub fn new_service( .route("/health", get(health)) .layer(Extension(schema)) .layer(TraceLayer::new_for_http()) + .layer(TimeoutLayer::new(request_timeout)) .layer(SetResponseHeaderLayer::<_>::overriding( ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"), diff --git a/crates/fuel-core/src/query/subscriptions.rs b/crates/fuel-core/src/query/subscriptions.rs index 87dfbf300fa..e3be265aa31 100644 --- a/crates/fuel-core/src/query/subscriptions.rs +++ b/crates/fuel-core/src/query/subscriptions.rs @@ -30,7 +30,7 @@ where } #[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))] -pub(crate) async fn transaction_status_change<'a, State>( +pub(crate) fn transaction_status_change<'a, State>( state: State, stream: BoxStream<'a, TxStatusMessage>, transaction_id: Bytes32, diff --git a/crates/fuel-core/src/query/subscriptions/test.rs b/crates/fuel-core/src/query/subscriptions/test.rs index 939952416b0..135919fa7e9 100644 --- a/crates/fuel-core/src/query/subscriptions/test.rs +++ b/crates/fuel-core/src/query/subscriptions/test.rs @@ -246,7 +246,6 @@ fn test_tsc_inner( let stream = futures::stream::iter(stream).boxed(); super::transaction_status_change(mock_state, stream, txn_id(0)) - .await .collect::>() .await }) diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index b8a6b4d151e..248e72769f2 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -302,12 +302,13 @@ impl TxStatusSubscription { &self, ctx: &Context<'a>, #[graphql(desc = "The ID of the transaction")] id: TransactionId, - ) -> impl Stream> + 'a { + ) -> anyhow::Result> + 'a> + { let txpool = ctx.data_unchecked::(); let db = ctx.data_unchecked::(); - let rx = txpool.tx_update_subscribe(id.into()).await; + let rx = txpool.tx_update_subscribe(id.into())?; - transaction_status_change( + Ok(transaction_status_change( move |id| match db.tx_status(&id) { Ok(status) => Ok(Some(status)), Err(StorageError::NotFound(_, _)) => { @@ -322,8 +323,7 @@ impl TxStatusSubscription { rx, id.into(), ) - .await - .map_err(async_graphql::Error::from) + .map_err(async_graphql::Error::from)) } /// Submits transaction to the `TxPool` and await either confirmation or failure. @@ -338,7 +338,7 @@ impl TxStatusSubscription { let config = ctx.data_unchecked::(); let tx = FuelTx::from_bytes(&tx.0)?; let tx_id = tx.id(&config.consensus_parameters.chain_id); - let subscription = txpool.tx_update_subscribe(tx_id).await; + let subscription = txpool.tx_update_subscribe(tx_id)?; let _: Vec<_> = txpool .insert(vec![Arc::new(tx)]) diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index ffbd649fb10..e55caf2c8c1 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -19,10 +19,7 @@ use crate::{ service::adapters::TxPoolAdapter, }; use async_trait::async_trait; -use fuel_core_services::stream::{ - BoxFuture, - BoxStream, -}; +use fuel_core_services::stream::BoxStream; use fuel_core_storage::{ iter::{ BoxedIter, @@ -233,8 +230,11 @@ impl TxPoolPort for TxPoolAdapter { self.service.insert(txs).await } - fn tx_update_subscribe(&self, id: TxId) -> BoxFuture> { - Box::pin(self.service.tx_update_subscribe(id)) + fn tx_update_subscribe( + &self, + id: TxId, + ) -> anyhow::Result> { + self.service.tx_update_subscribe(id) } } diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 3343232918d..f0cabfda032 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -35,6 +35,7 @@ pub use fuel_core_poa::Trigger; #[derive(Clone, Debug)] pub struct Config { pub addr: SocketAddr, + pub api_request_timeout: Duration, pub max_database_cache_size: usize, pub database_path: PathBuf, pub database_type: DbType, @@ -77,6 +78,7 @@ impl Config { Self { addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0), + api_request_timeout: Duration::from_secs(60), // Set the cache for tests = 10MB max_database_cache_size: 10 * 1024 * 1024, database_path: Default::default(), diff --git a/crates/fuel-core/src/service/query.rs b/crates/fuel-core/src/service/query.rs index 5586693e70c..b53d2db0fea 100644 --- a/crates/fuel-core/src/service/query.rs +++ b/crates/fuel-core/src/service/query.rs @@ -1,5 +1,4 @@ //! Queries we can run directly on `FuelService`. - use std::sync::Arc; use fuel_core_types::{ @@ -44,7 +43,7 @@ impl FuelService { tx: Transaction, ) -> anyhow::Result>> { let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id); - let stream = self.transaction_status_change(id).await; + let stream = self.transaction_status_change(id)?; self.submit(tx).await?; Ok(stream) } @@ -55,7 +54,7 @@ impl FuelService { tx: Transaction, ) -> anyhow::Result { let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id); - let stream = self.transaction_status_change(id).await.filter(|status| { + let stream = self.transaction_status_change(id)?.filter(|status| { futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_)))) }); futures::pin_mut!(stream); @@ -67,21 +66,20 @@ impl FuelService { } /// Return a stream of status changes for a transaction. - pub async fn transaction_status_change( + pub fn transaction_status_change( &self, id: Bytes32, - ) -> impl Stream> { + ) -> anyhow::Result>> { let txpool = self.shared.txpool.clone(); let db = self.shared.database.clone(); - let rx = Box::pin(txpool.tx_update_subscribe(id).await); - transaction_status_change( + let rx = txpool.tx_update_subscribe(id)?; + Ok(transaction_status_change( move |id| match db.get_tx_status(&id)? { Some(status) => Ok(Some(status)), None => Ok(txpool.find_one(id).map(Into::into)), }, rx, id, - ) - .await + )) } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index f5fb716f51a..56238449416 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -208,6 +208,7 @@ pub fn init_sub_services( Box::new(producer_adapter), Box::new(poa_adapter), config.query_log_threshold_time, + config.api_request_timeout, )?; let shared = SharedState { diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index fdb7f4d0dca..d93ec166e04 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -51,8 +51,12 @@ use fuel_core_types::{ tai64::Tai64, }; +use anyhow::anyhow; use parking_lot::Mutex as ParkingMutex; -use std::sync::Arc; +use std::{ + sync::Arc, + time::Duration, +}; use tokio::{ sync::broadcast, time::MissedTickBehavior, @@ -76,9 +80,9 @@ pub struct TxStatusChange { } impl TxStatusChange { - pub fn new(capacity: usize) -> Self { + pub fn new(capacity: usize, ttl: Duration) -> Self { let (new_tx_notification_sender, _) = broadcast::channel(capacity); - let update_sender = UpdateSender::new(capacity); + let update_sender = UpdateSender::new(capacity, ttl); Self { new_tx_notification_sender, update_sender, @@ -326,11 +330,11 @@ where self.tx_status_sender.new_tx_notification_sender.subscribe() } - pub async fn tx_update_subscribe(&self, tx_id: Bytes32) -> TxStatusStream { + pub fn tx_update_subscribe(&self, tx_id: Bytes32) -> anyhow::Result { self.tx_status_sender .update_sender - .subscribe::(tx_id) - .await + .try_subscribe::(tx_id) + .ok_or(anyhow!("Maximum number of subscriptions reached")) } } @@ -450,7 +454,14 @@ where gossiped_tx_stream, committed_block_stream, shared: SharedState { - tx_status_sender: TxStatusChange::new(number_of_active_subscription), + tx_status_sender: TxStatusChange::new( + number_of_active_subscription, + // The connection should be closed automatically after the `SqueezedOut` event. + // But because of slow/malicious consumers, the subscriber can still be occupied. + // We allow the subscriber to receive the event produced by TxPool's TTL. + // But we still want to drop subscribers after `2 * TxPool_TTL`. + 2 * config.transaction_ttl, + ), txpool, p2p, consensus_params, diff --git a/crates/services/txpool/src/service/tests.rs b/crates/services/txpool/src/service/tests.rs index e5d047d4425..85da7a46c70 100644 --- a/crates/services/txpool/src/service/tests.rs +++ b/crates/services/txpool/src/service/tests.rs @@ -198,11 +198,11 @@ async fn simple_insert_removal_subscription() { let mut tx1_subscribe_updates = service .shared .tx_update_subscribe(tx1.cached_id().unwrap()) - .await; + .unwrap(); let mut tx2_subscribe_updates = service .shared .tx_update_subscribe(tx2.cached_id().unwrap()) - .await; + .unwrap(); let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await; diff --git a/crates/services/txpool/src/service/tests_p2p.rs b/crates/services/txpool/src/service/tests_p2p.rs index 43e82631a2f..3a21e5156f1 100644 --- a/crates/services/txpool/src/service/tests_p2p.rs +++ b/crates/services/txpool/src/service/tests_p2p.rs @@ -26,7 +26,7 @@ async fn can_insert_from_p2p() { let mut receiver = service .shared .tx_update_subscribe(tx1.id(&Default::default())) - .await; + .unwrap(); service.start_and_await().await.unwrap(); @@ -66,7 +66,7 @@ async fn insert_from_local_broadcasts_to_p2p() { let mut subscribe_update = service .shared .tx_update_subscribe(tx1.cached_id().unwrap()) - .await; + .unwrap(); let out = service.shared.insert(vec![Arc::new(tx1.clone())]).await; @@ -115,7 +115,7 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { let mut receiver = service .shared .tx_update_subscribe(tx1.id(&Default::default())) - .await; + .unwrap(); service.start_and_await().await.unwrap(); diff --git a/crates/services/txpool/src/service/update_sender.rs b/crates/services/txpool/src/service/update_sender.rs index 349e34b8fcb..588df172772 100644 --- a/crates/services/txpool/src/service/update_sender.rs +++ b/crates/services/txpool/src/service/update_sender.rs @@ -2,17 +2,21 @@ use std::{ collections::HashMap, future::Future, pin::Pin, + time::Duration, }; use super::*; use parking_lot::Mutex; -use tokio::sync::{ - mpsc::{ - self, - error::TrySendError, +use tokio::{ + sync::{ + mpsc::{ + self, + error::TrySendError, + }, + OwnedSemaphorePermit, + Semaphore, }, - OwnedSemaphorePermit, - Semaphore, + time::Instant, }; use tokio_stream::{ wrappers::ReceiverStream, @@ -40,6 +44,8 @@ pub struct UpdateSender { senders: Arc>>, /// Semaphore used to limit the number of concurrent subscribers. permits: GetPermit, + /// TTL for senders + ttl: Duration, } /// Error returned when a transaction status update cannot be sent. @@ -78,6 +84,8 @@ struct Sender

> { stream: TxUpdateStream, /// The sending end of the subscriber channel. tx: Tx, + /// time that this sender was created + created: Instant, } /// A trait for sending transaction status updates. @@ -205,21 +213,21 @@ impl SendStatus for mpsc::Sender { impl UpdateSender { /// Create a new UpdateSender with a specified capacity for the semaphore - pub fn new(capacity: usize) -> UpdateSender { + pub fn new(capacity: usize, ttl: Duration) -> UpdateSender { UpdateSender { senders: Default::default(), permits: Arc::new(Semaphore::new(capacity)), + ttl, } } /// Try to subscribe for updates, returns a TxStatusStream if successful - #[cfg(test)] pub fn try_subscribe(&self, tx_id: Bytes32) -> Option where C: CreateChannel, { // Remove closed senders from the list - remove_closed(&mut self.senders.lock()); + remove_closed_and_expired(&mut self.senders.lock(), self.ttl); // Try to acquire a permit from the semaphore let permit = Arc::clone(&self.permits).try_acquire()?; @@ -228,25 +236,6 @@ impl UpdateSender { Some(self.subscribe_inner::(tx_id, permit)) } - /// Asynchronously await a permit to subscribe - /// returns a TxStatusStream on completion - pub async fn subscribe(&self, tx_id: Bytes32) -> TxStatusStream - where - C: CreateChannel, - { - // Remove closed senders from the list. - // Careful not to hold the lock while awaiting. - { - remove_closed(&mut self.senders.lock()); - } - - // Acquire a permit from the semaphore asynchronously. - let permit = Arc::clone(&self.permits).acquire().await; - - // Call subscribe_inner with the acquired permit. - self.subscribe_inner::(tx_id, permit) - } - /// Subscribe to updates with the given transaction id and a permit. fn subscribe_inner(&self, tx_id: Bytes32, permit: Permit) -> TxStatusStream where @@ -256,7 +245,7 @@ impl UpdateSender { let mut senders = self.senders.lock(); // Remove closed senders from the list - remove_closed(&mut senders); + remove_closed_and_expired(&mut senders, self.ttl); // Call the subscribe function with the tx_id, senders, and permit subscribe::<_, C>(tx_id, &mut (*senders), permit) @@ -268,7 +257,7 @@ impl UpdateSender { let mut senders = self.senders.lock(); // Remove closed senders from the list - remove_closed(&mut senders); + remove_closed_and_expired(&mut senders, self.ttl); // Initialize a flag to check if there are no senders // left for a given tx_id. @@ -308,6 +297,7 @@ where _permit: permit, stream: TxUpdateStream::new(), tx, + created: Instant::now(), }); // Return the receiver part of the channel @@ -315,13 +305,13 @@ where } // Remove closed senders from the senders map -fn remove_closed(senders: &mut SenderMap) +fn remove_closed_and_expired(senders: &mut SenderMap, ttl: Duration) where Tx: SendStatus, { // Iterate over the senders map, retaining only the senders that are not closed senders.retain(|_, senders| { - senders.retain(|sender| !sender.is_closed()); + senders.retain(|sender| !sender.is_closed() && sender.created.elapsed() < ttl); // Continue retaining if the senders list is not empty !senders.is_empty() }); @@ -349,6 +339,7 @@ impl Clone for UpdateSender { Self { senders: self.senders.clone(), permits: self.permits.clone(), + ttl: self.ttl, } } } diff --git a/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs b/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs index f178f11d0f7..b3871b06e86 100644 --- a/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs +++ b/crates/services/txpool/src/service/update_sender/tests/test_e2e.rs @@ -74,7 +74,7 @@ fn test_update_sender_inner(ops: Vec) { } // Initialize test variables - let update = UpdateSender::new(CAPACITY); + let update = UpdateSender::new(CAPACITY, Duration::from_secs(5)); let mut receivers: Vec = Vec::new(); let mut model_receivers: Vec<(u8, usize, [Option; 2])> = Vec::new(); let mut sender_id = 0usize; diff --git a/crates/services/txpool/src/service/update_sender/tests/test_permits.rs b/crates/services/txpool/src/service/update_sender/tests/test_permits.rs index 491cf4ee8c8..e819a12083e 100644 --- a/crates/services/txpool/src/service/update_sender/tests/test_permits.rs +++ b/crates/services/txpool/src/service/update_sender/tests/test_permits.rs @@ -68,6 +68,7 @@ fn test_try_subscribe_inner(senders: HashMap>(), )