Skip to content

Commit

Permalink
client: Timeout resends during send_and_confirm_in_parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
joncinque committed Mar 21, 2024
1 parent b2f4fb3 commit 0055b8b
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions client/src/send_and_confirm_transactions_in_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
tokio::{sync::RwLock, task::JoinHandle, time::Instant},
};

const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10);
const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5);
const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2);
const SEND_INTERVAL: Duration = Duration::from_millis(10);
type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;
Expand Down Expand Up @@ -326,21 +326,20 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
);
}

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}

// wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction
while !unconfirmed_transaction_map.is_empty()
&& current_block_height.load(Ordering::Relaxed) <= max_valid_block_height
{
let block_height = current_block_height.load(Ordering::Relaxed);

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}

if let Some(tpu_client) = tpu_client {
let instant = Instant::now();
// retry sending transaction only over TPU port
Expand All @@ -349,10 +348,25 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
.iter()
.filter(|x| block_height < x.last_valid_block_height)
.map(|x| x.serialized_transaction.clone())
.collect();
let _ = tpu_client
.try_send_wire_transaction_batch(txs_to_resend_over_tpu)
.await;
.collect::<Vec<_>>();
let num_txs_to_resend = txs_to_resend_over_tpu.len();
let message = if tokio::time::timeout(
Duration::from_secs(5),
tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu),
)
.await
.is_err()
{
format!("Timed out resending {num_txs_to_resend} transactions...")
} else {
format!("Resent {num_txs_to_resend} transactions...")
};

if let Some(progress_bar) = progress_bar {
let progress =
progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(progress_bar, &message);
}

let elapsed = instant.elapsed();
if elapsed < TPU_RESEND_REFRESH_RATE {
Expand All @@ -370,14 +384,6 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction
max_valid_block_height = max_valid_block_height_in_remaining_transaction;
}
}

if let Some(progress_bar) = progress_bar {
let progress = progress_from_context_and_block_height(context, max_valid_block_height);
progress.set_message_for_confirmed_transactions(
progress_bar,
"Checking transaction status...",
);
}
}
}

Expand Down

0 comments on commit 0055b8b

Please sign in to comment.