Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deploy: Check signature statuses in parallel #1384

Closed
wants to merge 1 commit into from

Conversation

joncinque
Copy link

Problem

During deployment, we need to check the status of sent transactions in order to see if they have been confirmed. If they are confirmed, they are removed from the resend list.

Unfortunately, the code currently checks signature statuses in series, which means it can take a long time to find out the status of thousands of transactions.

Summary of Changes

Check signature statuses concurrently. There isn't a huge impact in testing 1000 self-transfers on testnet, but here are some timings:

  • after: 26s, 17s, 16s
  • before: 25s, 15s, 20s

If you think this isn't worth it, I'm happy to close, but let me know!

@joncinque joncinque requested a review from KirillLykov May 16, 2024 13:12
@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 80.00000% with 5 lines in your changes are missing coverage. Please review.

Project coverage is 82.8%. Comparing base (0214743) to head (1b9913f).
Report is 23 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #1384   +/-   ##
=======================================
  Coverage    82.8%    82.8%           
=======================================
  Files         867      867           
  Lines      368875   368879    +4     
=======================================
+ Hits       305646   305660   +14     
+ Misses      63229    63219   -10     

@KirillLykov
Copy link

KirillLykov commented May 17, 2024

What if you try JoinSet instead? Wonder what would be the performance. Below is the diff for client crate:

diff --git client/Cargo.toml client/Cargo.toml
index ece0b..b0652 100644
--- client/Cargo.toml
+++ client/Cargo.toml
@@ -35,6 +35,7 @@ solana-tpu-client = { workspace = true, features = ["default"] }
 solana-udp-client = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["full"] }
+itertools = {workspace = true}
 
 [dev-dependencies]
 crossbeam-channel = { workspace = true }
diff --git client/src/send_and_confirm_transactions_in_parallel.rs client/src/send_and_confirm_transactions_in_parallel.rs
index aa65ef..35e73 100644
--- client/src/send_and_confirm_transactions_in_parallel.rs
+++ client/src/send_and_confirm_transactions_in_parallel.rs
@@ -6,6 +6,7 @@ use {
     bincode::serialize,
     dashmap::DashMap,
     futures_util::future::join_all,
+    itertools::Itertools,
     solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
     solana_rpc_client::spinner::{self, SendTransactionProgress},
     solana_rpc_client_api::{
@@ -16,7 +17,7 @@ use {
     solana_sdk::{
         hash::Hash,
         message::Message,
-        signature::{Signature, SignerError},
+        signature::{self, Signature, SignerError},
         signers::Signers,
         transaction::{Transaction, TransactionError},
     },
@@ -113,11 +114,12 @@ fn create_transaction_confirmation_task(
     tokio::spawn(async move {
         // check transactions that are not expired or have just expired between two checks
         let mut last_block_height = current_block_height.load(Ordering::Relaxed);
+        let mut futures = tokio::task::JoinSet::new();
 
         loop {
             if !unconfirmed_transaction_map.is_empty() {
                 let current_block_height = current_block_height.load(Ordering::Relaxed);
-                let transactions_to_verify: Vec<Signature> = unconfirmed_transaction_map
+                let transactions_to_verify: Vec<Vec<Signature>> = unconfirmed_transaction_map
                     .iter()
                     .filter(|x| {
                         let is_not_expired = current_block_height <= x.last_valid_block_height;
@@ -127,11 +129,23 @@ fn create_transaction_confirmation_task(
                         is_not_expired || is_recently_expired
                     })
                     .map(|x| *x.key())
+                    .chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
+                    .into_iter()
+                    .map(|chunk| chunk.collect::<Vec<_>>())
                     .collect();
-                for signatures in
-                    transactions_to_verify.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
-                {
-                    if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {
+                while futures.join_next().await.is_some() {}
+
+                for signatures in transactions_to_verify {
+                    let rpc_client = rpc_client.clone();
+                    let errors_map = errors_map.clone();
+                    let num_confirmed_transactions = num_confirmed_transactions.clone();
+                    let unconfirmed_transaction_map = unconfirmed_transaction_map.clone();
+                    //let signatures = signatures.to_vec();
+                    futures.spawn(async move {
+                        let Ok(result) = rpc_client.get_signature_statuses(&signatures).await
+                        else {
+                            return;
+                        };
                         let statuses = result.value;
                         for (signature, status) in signatures.iter().zip(statuses.into_iter()) {
                             if let Some((status, data)) = status
@@ -153,12 +167,12 @@ fn create_transaction_confirmation_task(
                                 }
                             };
                         }
-                    }
+                    });
                 }
 
                 last_block_height = current_block_height;
             }
-            tokio::time::sleep(Duration::from_secs(1)).await;
+            //tokio::time::sleep(Duration::from_secs(1)).await;
         }
     })
 }

Copy link

@KirillLykov KirillLykov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it doesn't improve the performance, I would just simplify the code with let-else without adding join_all

let futures = transactions_to_verify
.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
.map(|signatures| async {
if let Ok(result) = rpc_client.get_signature_statuses(signatures).await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you a rein this code, I would prefer to exit early:

let Ok(result) = rpc_client.get_signature_statuses(signatures).await else {
    return;
};

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works for me!

@joncinque
Copy link
Author

Sorry, I think Task is a little overkill in this case, since we don't need to detach the execution, we just need to do the network calls concurrently. Is there a reason that normal Future isn't good enough?

@KirillLykov
Copy link

Sorry, I think Task is a little overkill in this case, since we don't need to detach the execution, we just need to do the network calls concurrently. Is there a reason that normal Future isn't good enough?

I think there is some misunderstanding of the patch intention. I was curious if there any performance boost due to JoinSet usage, if not than I would not go for it because it adds some complexity.

Independently on this particular code, I think there are some benefits of using JoinSet vs join_all! when the workload is complex:

  • spawning a task is cheap, if we don't want to consume all the threads from runtime thread pool we can control this by checking the JoinSet size,
  • I feel like JoinSet has a very clean interface in comparison to FuturesOrdered/Unordered.
  • join_all uses FuturesOrdered which has some a bit obscure problems (see Footgun lurking in FuturesUnordered and other concurrency-enabling streams rust-lang/futures-rs#2387). My understanding is that in this particular code the aforementioned problem shouldn't happen. The only hypothetical scenario I see is when response processing takes some time due to waiting for lock inside DashMap for too long time.

@joncinque
Copy link
Author

Closing since the new TPU client will be the best option 😄

@joncinque joncinque closed this Oct 28, 2024
@joncinque joncinque deleted the sigparallel branch October 28, 2024 13:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants