Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tx subscription cleanup #1422

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ pub struct Command {
/// Time to wait after submitting a query before debug info will be logged about query.
#[clap(long = "query_log_threshold_time", default_value = "2s", env)]
pub query_log_threshold_time: humantime::Duration,

/// Timeout before drop the request.
#[clap(long = "api-request-timeout", default_value = "30m", env)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this means 30 minutes? It seems long for a timeout - I would think closer to 30 seconds maybe?

Copy link
Member Author

@Voxelot Voxelot Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30s is too short, as it may take longer than that for transactions to make it into a block and users will have a subscription open during that whole duration. I agree this could be shorter, but this is a pretty safe default that will address some of the outstanding issues.

pub api_request_timeout: humantime::Duration,
}

impl Command {
Expand Down Expand Up @@ -230,6 +234,7 @@ impl Command {
min_connected_reserved_peers,
time_until_synced,
query_log_threshold_time,
api_request_timeout,
} = self;

let addr = net::SocketAddr::new(ip, port);
Expand Down Expand Up @@ -286,6 +291,7 @@ impl Command {

Ok(Config {
addr,
api_request_timeout: api_request_timeout.into(),
max_database_cache_size,
database_path,
database_type,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ tempfile = { workspace = true, optional = true }
thiserror = "1.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }
tower-http = { version = "0.3", features = ["set-header", "trace"] }
tower-http = { version = "0.3", features = ["set-header", "trace", "timeout"] }
tracing = { workspace = true }
uuid = { version = "1.1", features = ["v4"], optional = true }

Expand Down
7 changes: 2 additions & 5 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use async_trait::async_trait;
use fuel_core_services::stream::{
BoxFuture,
BoxStream,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -172,7 +169,7 @@ pub trait TxPoolPort: Send + Sync {
fn tx_update_subscribe(
&self,
tx_id: TxId,
) -> BoxFuture<'_, BoxStream<TxStatusMessage>>;
) -> anyhow::Result<BoxStream<TxStatusMessage>>;
}

#[async_trait]
Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-core/src/graphql_api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use std::{
use tokio_stream::StreamExt;
use tower_http::{
set_header::SetResponseHeaderLayer,
timeout::TimeoutLayer,
trace::TraceLayer,
};

Expand Down Expand Up @@ -156,14 +157,16 @@ impl RunnableTask for Task {
}

// Need a seperate Data Object for each Query endpoint, cannot be avoided
#[allow(clippy::too_many_arguments)]
pub fn new_service(
config: Config,
schema: CoreSchemaBuilder,
database: Database,
txpool: TxPool,
producer: BlockProducer,
consensus_module: ConsensusModule,
_log_threshold_ms: Duration,
log_threshold_ms: Duration,
request_timeout: Duration,
) -> anyhow::Result<Service> {
let network_addr = config.addr;

Expand All @@ -174,7 +177,7 @@ pub fn new_service(
.data(producer)
.data(consensus_module)
.extension(async_graphql::extensions::Tracing)
.extension(MetricsExtension::new(_log_threshold_ms))
.extension(MetricsExtension::new(log_threshold_ms))
.finish();

let router = Router::new()
Expand All @@ -188,6 +191,7 @@ pub fn new_service(
.route("/health", get(health))
.layer(Extension(schema))
.layer(TraceLayer::new_for_http())
.layer(TimeoutLayer::new(request_timeout))
.layer(SetResponseHeaderLayer::<_>::overriding(
ACCESS_CONTROL_ALLOW_ORIGIN,
HeaderValue::from_static("*"),
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/query/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
}

#[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))]
pub(crate) async fn transaction_status_change<'a, State>(
pub(crate) fn transaction_status_change<'a, State>(
state: State,
stream: BoxStream<'a, TxStatusMessage>,
transaction_id: Bytes32,
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/query/subscriptions/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ fn test_tsc_inner(

let stream = futures::stream::iter(stream).boxed();
super::transaction_status_change(mock_state, stream, txn_id(0))
.await
.collect::<Vec<_>>()
.await
})
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,13 @@ impl TxStatusSubscription {
&self,
ctx: &Context<'a>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
) -> impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a {
) -> anyhow::Result<impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a>
{
let txpool = ctx.data_unchecked::<TxPool>();
let db = ctx.data_unchecked::<Database>();
let rx = txpool.tx_update_subscribe(id.into()).await;
let rx = txpool.tx_update_subscribe(id.into())?;

transaction_status_change(
Ok(transaction_status_change(
move |id| match db.tx_status(&id) {
Ok(status) => Ok(Some(status)),
Err(StorageError::NotFound(_, _)) => {
Expand All @@ -315,8 +316,7 @@ impl TxStatusSubscription {
rx,
id.into(),
)
.await
.map_err(async_graphql::Error::from)
.map_err(async_graphql::Error::from))
}

/// Submits transaction to the `TxPool` and await either confirmation or failure.
Expand All @@ -331,7 +331,7 @@ impl TxStatusSubscription {
let config = ctx.data_unchecked::<Config>();
let tx = FuelTx::from_bytes(&tx.0)?;
let tx_id = tx.id(&config.transaction_parameters.chain_id);
let subscription = txpool.tx_update_subscribe(tx_id).await;
let subscription = txpool.tx_update_subscribe(tx_id)?;

let _: Vec<_> = txpool
.insert(vec![Arc::new(tx)])
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use crate::{
service::adapters::TxPoolAdapter,
};
use async_trait::async_trait;
use fuel_core_services::stream::{
BoxFuture,
BoxStream,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -233,8 +230,11 @@ impl TxPoolPort for TxPoolAdapter {
self.service.insert(txs).await
}

fn tx_update_subscribe(&self, id: TxId) -> BoxFuture<BoxStream<TxStatusMessage>> {
Box::pin(self.service.tx_update_subscribe(id))
fn tx_update_subscribe(
&self,
id: TxId,
) -> anyhow::Result<BoxStream<TxStatusMessage>> {
self.service.tx_update_subscribe(id)
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub use fuel_core_poa::Trigger;
#[derive(Clone, Debug)]
pub struct Config {
pub addr: SocketAddr,
pub api_request_timeout: Duration,
pub max_database_cache_size: usize,
pub database_path: PathBuf,
pub database_type: DbType,
Expand Down Expand Up @@ -69,6 +70,7 @@ impl Config {
let min_gas_price = 0;
Self {
addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
api_request_timeout: Duration::from_secs(60),
// Set the cache for tests = 10MB
max_database_cache_size: 10 * 1024 * 1024,
database_path: Default::default(),
Expand Down
16 changes: 7 additions & 9 deletions crates/fuel-core/src/service/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Queries we can run directly on `FuelService`.

use std::sync::Arc;

use fuel_core_types::{
Expand Down Expand Up @@ -49,7 +48,7 @@ impl FuelService {
.chain_conf
.transaction_parameters
.chain_id);
let stream = self.transaction_status_change(id).await;
let stream = self.transaction_status_change(id)?;
self.submit(tx).await?;
Ok(stream)
}
Expand All @@ -65,7 +64,7 @@ impl FuelService {
.chain_conf
.transaction_parameters
.chain_id);
let stream = self.transaction_status_change(id).await.filter(|status| {
let stream = self.transaction_status_change(id)?.filter(|status| {
futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_))))
});
futures::pin_mut!(stream);
Expand All @@ -77,21 +76,20 @@ impl FuelService {
}

/// Return a stream of status changes for a transaction.
pub async fn transaction_status_change(
pub fn transaction_status_change(
&self,
id: Bytes32,
) -> impl Stream<Item = anyhow::Result<TransactionStatus>> {
) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>>> {
let txpool = self.shared.txpool.clone();
let db = self.shared.database.clone();
let rx = Box::pin(txpool.tx_update_subscribe(id).await);
transaction_status_change(
let rx = txpool.tx_update_subscribe(id)?;
Ok(transaction_status_change(
move |id| match db.get_tx_status(&id)? {
Some(status) => Ok(Some(status)),
None => Ok(txpool.find_one(id).map(Into::into)),
},
rx,
id,
)
.await
))
}
}
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ pub fn init_sub_services(
Box::new(producer_adapter),
Box::new(poa_adapter),
config.query_log_threshold_time,
config.api_request_timeout,
)?;

let shared = SharedState {
Expand Down
25 changes: 18 additions & 7 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ use fuel_core_types::{
tai64::Tai64,
};

use anyhow::anyhow;
use parking_lot::Mutex as ParkingMutex;
use std::sync::Arc;
use std::{
sync::Arc,
time::Duration,
};
use tokio::{
sync::broadcast,
time::MissedTickBehavior,
Expand All @@ -76,9 +80,9 @@ pub struct TxStatusChange {
}

impl TxStatusChange {
pub fn new(capacity: usize) -> Self {
pub fn new(capacity: usize, ttl: Duration) -> Self {
let (new_tx_notification_sender, _) = broadcast::channel(capacity);
let update_sender = UpdateSender::new(capacity);
let update_sender = UpdateSender::new(capacity, ttl);
Self {
new_tx_notification_sender,
update_sender,
Expand Down Expand Up @@ -326,11 +330,11 @@ where
self.tx_status_sender.new_tx_notification_sender.subscribe()
}

pub async fn tx_update_subscribe(&self, tx_id: Bytes32) -> TxStatusStream {
pub fn tx_update_subscribe(&self, tx_id: Bytes32) -> anyhow::Result<TxStatusStream> {
self.tx_status_sender
.update_sender
.subscribe::<MpscChannel>(tx_id)
.await
.try_subscribe::<MpscChannel>(tx_id)
.ok_or(anyhow!("Maximum number of subscriptions reached"))
}
}

Expand Down Expand Up @@ -450,7 +454,14 @@ where
gossiped_tx_stream,
committed_block_stream,
shared: SharedState {
tx_status_sender: TxStatusChange::new(number_of_active_subscription),
tx_status_sender: TxStatusChange::new(
number_of_active_subscription,
// The connection should be closed automatically after the `SqueezedOut` event.
// But because of slow/malicious consumers, the subscriber can still be occupied.
// We allow the subscriber to receive the event produced by TxPool's TTL.
// But we still want to drop subscribers after `2 * TxPool_TTL`.
Comment on lines +460 to +462
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the idea is to keep the subscription live long enough for one of:

  • TX to complete/squeeze and report complete/squeeze event
  • TX to timeout and report timeout event

Doubling the Txpool TTL allows us to use the second half of the doubled TTL for reporting the event.

Is that more or less accurate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also gives a slow consumer enough time to read the event within some reasonable deadline.

2 * config.transaction_ttl,
),
txpool,
p2p,
consensus_params,
Expand Down
4 changes: 2 additions & 2 deletions crates/services/txpool/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ async fn simple_insert_removal_subscription() {
let mut tx1_subscribe_updates = service
.shared
.tx_update_subscribe(tx1.cached_id().unwrap())
.await;
.unwrap();
let mut tx2_subscribe_updates = service
.shared
.tx_update_subscribe(tx2.cached_id().unwrap())
.await;
.unwrap();

let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await;

Expand Down
6 changes: 3 additions & 3 deletions crates/services/txpool/src/service/tests_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn can_insert_from_p2p() {
let mut receiver = service
.shared
.tx_update_subscribe(tx1.id(&ConsensusParameters::DEFAULT.chain_id))
.await;
.unwrap();

service.start_and_await().await.unwrap();

Expand Down Expand Up @@ -68,7 +68,7 @@ async fn insert_from_local_broadcasts_to_p2p() {
let mut subscribe_update = service
.shared
.tx_update_subscribe(tx1.cached_id().unwrap())
.await;
.unwrap();

let out = service.shared.insert(vec![Arc::new(tx1.clone())]).await;

Expand Down Expand Up @@ -117,7 +117,7 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() {
let mut receiver = service
.shared
.tx_update_subscribe(tx1.id(&ConsensusParameters::DEFAULT.chain_id))
.await;
.unwrap();

service.start_and_await().await.unwrap();

Expand Down
Loading