From f118b515414dc4730afd553da5935602a350a637 Mon Sep 17 00:00:00 2001 From: Lucas B Date: Sat, 20 Jan 2024 14:31:36 -0600 Subject: [PATCH] Run MEV claims + reclaiming rent-exempt amounts in parallel. Be more aggressive about sending transactions. --- Cargo.lock | 1 + ledger/src/bank_forks_utils.rs | 2 +- tip-distributor/Cargo.toml | 1 + tip-distributor/src/bin/claim-mev-tips.rs | 199 ++++--- tip-distributor/src/claim_mev_workflow.rs | 560 +++++++++---------- tip-distributor/src/lib.rs | 281 +++++----- tip-distributor/src/reclaim_rent_workflow.rs | 407 ++++++++------ 7 files changed, 720 insertions(+), 731 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cfe08d650..49d6def0d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7798,6 +7798,7 @@ dependencies = [ "solana-runtime", "solana-sdk", "solana-stake-program", + "solana-transaction-status", "solana-vote", "thiserror", "tokio", diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 65cdc8aaa9..46b24cbf38 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -167,7 +167,7 @@ pub fn load_bank_forks( snapshot_utils::get_highest_incremental_snapshot_archive_info( &snapshot_config.incremental_snapshot_archives_dir, full_snapshot_archive_info.slot(), - None, + halt_at_slot, ); Some(( diff --git a/tip-distributor/Cargo.toml b/tip-distributor/Cargo.toml index f15085a262..76682d220a 100644 --- a/tip-distributor/Cargo.toml +++ b/tip-distributor/Cargo.toml @@ -35,6 +35,7 @@ solana-rpc-client-api = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-stake-program = { workspace = true } +solana-transaction-status = { workspace = true } solana-vote = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/tip-distributor/src/bin/claim-mev-tips.rs b/tip-distributor/src/bin/claim-mev-tips.rs index 833a5da27c..dd57db9231 100644 --- a/tip-distributor/src/bin/claim-mev-tips.rs +++ b/tip-distributor/src/bin/claim-mev-tips.rs @@ -1,10 +1,14 @@ //! This binary claims MEV tips. use { clap::Parser, + futures::future::join_all, gethostname::gethostname, log::*, solana_metrics::{datapoint_error, datapoint_info, set_host_id}, - solana_sdk::{pubkey::Pubkey, signature::read_keypair_file}, + solana_sdk::{ + pubkey::Pubkey, + signature::{read_keypair_file, Keypair}, + }, solana_tip_distributor::{ claim_mev_workflow::{claim_mev_tips, ClaimMevError}, read_json_from_file, @@ -37,21 +41,9 @@ struct Args { #[arg(long, env)] keypair_path: PathBuf, - /// Number of unique connections to the RPC server for sending txns - #[arg(long, env, default_value_t = 128)] - rpc_send_connection_count: u64, - - /// Rate-limits the maximum number of GET requests per RPC connection - #[arg(long, env, default_value_t = 256)] - max_concurrent_rpc_get_reqs: usize, - - /// Number of retries for main claim send loop. Loop is time bounded. - #[arg(long, env, default_value_t = 5)] - max_loop_retries: u64, - - /// Limits how long before send loop runs before stopping. Defaults to 10 mins - #[arg(long, env, default_value_t = 10 * 60)] - max_loop_duration_secs: u64, + /// Limits how long before send loop runs before stopping + #[arg(long, env, default_value_t = 60 * 60)] + max_retry_duration_secs: u64, /// Specifies whether to reclaim any rent. #[arg(long, env, default_value_t = true)] @@ -61,40 +53,28 @@ struct Args { #[arg(long, env)] should_reclaim_tdas: bool, - /// The price to pay per compute unit aka "Priority Fee". + /// The price to pay for priority fee #[arg(long, env, default_value_t = 1)] - micro_lamports_per_compute_unit: u64, + micro_lamports: u64, } -#[tokio::main] -async fn main() -> Result<(), ClaimMevError> { - env_logger::init(); - gethostname() - .into_string() - .map(set_host_id) - .expect("set hostname"); - let args: Args = Args::parse(); - let keypair = Arc::new(read_keypair_file(&args.keypair_path).expect("read keypair file")); - let merkle_trees: GeneratedMerkleTreeCollection = - read_json_from_file(&args.merkle_trees_path).expect("read GeneratedMerkleTreeCollection"); - let max_loop_duration = Duration::from_secs(args.max_loop_duration_secs); - - info!( - "Starting to claim mev tips for epoch: {}", - merkle_trees.epoch - ); +async fn start_mev_claim_process( + merkle_trees: GeneratedMerkleTreeCollection, + rpc_url: String, + tip_distribution_program_id: Pubkey, + signer: Arc, + max_loop_duration: Duration, + micro_lamports: u64, +) -> Result<(), ClaimMevError> { let start = Instant::now(); match claim_mev_tips( - merkle_trees.clone(), - args.rpc_url.clone(), - args.rpc_send_connection_count, - args.max_concurrent_rpc_get_reqs, - &args.tip_distribution_program_id, - keypair.clone(), - args.max_loop_retries, + &merkle_trees, + rpc_url, + tip_distribution_program_id, + signer, max_loop_duration, - args.micro_lamports_per_compute_unit, + micro_lamports, ) .await { @@ -104,11 +84,6 @@ async fn main() -> Result<(), ClaimMevError> { ("epoch", merkle_trees.epoch, i64), ("error", 1, i64), ("err_str", e.to_string(), String), - ( - "merkle_trees_path", - args.merkle_trees_path.to_string_lossy(), - String - ), ("elapsed_us", start.elapsed().as_micros(), i64), ); Err(e) @@ -117,61 +92,99 @@ async fn main() -> Result<(), ClaimMevError> { datapoint_info!( "claim_mev_workflow-claim_completion", ("epoch", merkle_trees.epoch, i64), - ( - "merkle_trees_path", - args.merkle_trees_path.to_string_lossy(), - String - ), ("elapsed_us", start.elapsed().as_micros(), i64), ); Ok(()) } - }?; + } +} + +async fn start_rent_claim( + rpc_url: String, + tip_distribution_program_id: Pubkey, + signer: Arc, + max_loop_duration: Duration, + should_reclaim_tdas: bool, + micro_lamports: u64, + epoch: u64, +) -> Result<(), ClaimMevError> { + let start = Instant::now(); + match reclaim_rent( + rpc_url, + tip_distribution_program_id, + signer, + max_loop_duration, + should_reclaim_tdas, + micro_lamports, + ) + .await + { + Err(e) => { + datapoint_error!( + "claim_mev_workflow-reclaim_rent_error", + ("epoch", epoch, i64), + ("error", 1, i64), + ("err_str", e.to_string(), String), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + Err(e) + } + Ok(()) => { + datapoint_info!( + "claim_mev_workflow-reclaim_rent_completion", + ("epoch", epoch, i64), + ("elapsed_us", start.elapsed().as_micros(), i64), + ); + Ok(()) + } + } +} +#[tokio::main] +async fn main() -> Result<(), ClaimMevError> { + env_logger::init(); + + gethostname() + .into_string() + .map(set_host_id) + .expect("set hostname"); + + let args: Args = Args::parse(); + let keypair = Arc::new(read_keypair_file(&args.keypair_path).expect("read keypair file")); + let merkle_trees: GeneratedMerkleTreeCollection = + read_json_from_file(&args.merkle_trees_path).expect("read GeneratedMerkleTreeCollection"); + let max_loop_duration = Duration::from_secs(args.max_retry_duration_secs); + + info!( + "Starting to claim mev tips for epoch: {}", + merkle_trees.epoch + ); + let epoch = merkle_trees.epoch; + + let mut futs = vec![]; + futs.push(tokio::spawn(start_mev_claim_process( + merkle_trees, + args.rpc_url.clone(), + args.tip_distribution_program_id, + keypair.clone(), + max_loop_duration, + args.micro_lamports, + ))); if args.should_reclaim_rent { - let start = Instant::now(); - match reclaim_rent( - args.rpc_url, - args.rpc_send_connection_count, + futs.push(tokio::spawn(start_rent_claim( + args.rpc_url.clone(), args.tip_distribution_program_id, - keypair, - args.max_loop_retries, + keypair.clone(), max_loop_duration, args.should_reclaim_tdas, - args.micro_lamports_per_compute_unit, - ) - .await - { - Err(e) => { - datapoint_error!( - "claim_mev_workflow-reclaim_rent_error", - ("epoch", merkle_trees.epoch, i64), - ("error", 1, i64), - ("err_str", e.to_string(), String), - ( - "merkle_trees_path", - args.merkle_trees_path.to_string_lossy(), - String - ), - ("elapsed_us", start.elapsed().as_micros(), i64), - ); - Err(e) - } - Ok(()) => { - datapoint_info!( - "claim_mev_workflow-reclaim_rent_completion", - ("epoch", merkle_trees.epoch, i64), - ( - "merkle_trees_path", - args.merkle_trees_path.to_string_lossy(), - String - ), - ("elapsed_us", start.elapsed().as_micros(), i64), - ); - Ok(()) - } - }?; + args.micro_lamports, + epoch, + ))); } + let results = join_all(futs).await; solana_metrics::flush(); // sometimes last datapoint doesn't get emitted. this increases likelihood. + for r in results { + r.map_err(|e| ClaimMevError::UncaughtError { e: e.to_string() })??; + } Ok(()) } diff --git a/tip-distributor/src/claim_mev_workflow.rs b/tip-distributor/src/claim_mev_workflow.rs index 22d2cebeaa..929b64fe3b 100644 --- a/tip-distributor/src/claim_mev_workflow.rs +++ b/tip-distributor/src/claim_mev_workflow.rs @@ -1,19 +1,17 @@ use { - crate::{ - claim_mev_workflow::ClaimMevError::{ClaimantNotFound, InsufficientBalance, TDANotFound}, - minimum_balance, sign_and_send_transactions_with_retries_multi_rpc, - GeneratedMerkleTreeCollection, TreeNode, - }, + crate::{send_until_blockhash_expires, GeneratedMerkleTreeCollection}, anchor_lang::{AccountDeserialize, InstructionData, ToAccountMetas}, itertools::Itertools, jito_tip_distribution::state::{ClaimStatus, Config, TipDistributionAccount}, - log::{debug, error, info}, + log::{error, info, warn}, + rand::{prelude::SliceRandom, thread_rng}, solana_client::nonblocking::rpc_client::RpcClient, - solana_metrics::{datapoint_info, datapoint_warn}, + solana_metrics::datapoint_info, solana_program::{ fee_calculator::DEFAULT_TARGET_LAMPORTS_PER_SIGNATURE, native_token::LAMPORTS_PER_SOL, system_program, }, + solana_rpc_client_api::config::RpcSimulateTransactionConfig, solana_sdk::{ account::Account, commitment_config::CommitmentConfig, @@ -42,24 +40,8 @@ pub enum ClaimMevError { #[error(transparent)] AnchorError(anchor_lang::error::Error), - #[error("TDA not found for pubkey: {0:?}")] - TDANotFound(Pubkey), - - #[error("Claim Status not found for pubkey: {0:?}")] - ClaimStatusNotFound(Pubkey), - - #[error("Claimant not found for pubkey: {0:?}")] - ClaimantNotFound(Pubkey), - #[error(transparent)] - MaxFetchRetriesExceeded(#[from] solana_rpc_client_api::client_error::Error), - - #[error("Failed after {attempts} retries. {remaining_transaction_count} remaining mev claim transactions, {failed_transaction_count} failed requests.",)] - MaxSendTransactionRetriesExceeded { - attempts: u64, - remaining_transaction_count: usize, - failed_transaction_count: usize, - }, + RpcError(#[from] solana_rpc_client_api::client_error::Error), #[error("Expected to have at least {desired_balance} lamports in {payer:?}. Current balance is {start_balance} lamports. Deposit {sol_to_deposit} SOL to continue.")] InsufficientBalance { @@ -68,318 +50,283 @@ pub enum ClaimMevError { start_balance: u64, sol_to_deposit: u64, }, -} -pub async fn claim_mev_tips( - merkle_trees: GeneratedMerkleTreeCollection, - rpc_url: String, - rpc_send_connection_count: u64, - max_concurrent_rpc_get_reqs: usize, - tip_distribution_program_id: &Pubkey, - keypair: Arc, - max_loop_retries: u64, - max_loop_duration: Duration, - micro_lamports_per_compute_unit: u64, -) -> Result<(), ClaimMevError> { - let payer_pubkey = keypair.pubkey(); - let blockhash_rpc_client = Arc::new(RpcClient::new_with_commitment( - rpc_url.clone(), - CommitmentConfig::finalized(), - )); - let rpc_clients = Arc::new( - (0..rpc_send_connection_count) - .map(|_| { - Arc::new(RpcClient::new_with_commitment( - rpc_url.clone(), - CommitmentConfig::confirmed(), - )) - }) - .collect_vec(), - ); + #[error("Not finished with job, transactions left {transactions_left}")] + NotFinished { transactions_left: usize }, + + #[error("UncaughtError {e:?}")] + UncaughtError { e: String }, +} +pub async fn get_claim_transactions_for_valid_unclaimed( + rpc_client: &RpcClient, + merkle_trees: &GeneratedMerkleTreeCollection, + tip_distribution_program_id: Pubkey, + micro_lamports: u64, + payer_pubkey: Pubkey, +) -> Result, ClaimMevError> { let tree_nodes = merkle_trees .generated_merkle_trees .iter() .flat_map(|tree| &tree.tree_nodes) .collect_vec(); - // fetch all accounts up front info!( - "Starting to fetch accounts for epoch {}", + "reading tip distribution related accounts for epoch {}", merkle_trees.epoch ); - let tdas = crate::get_batched_accounts( - &blockhash_rpc_client, - max_concurrent_rpc_get_reqs, - merkle_trees - .generated_merkle_trees - .iter() - .map(|tree| tree.tip_distribution_account) - .collect_vec(), - ) - .await - .map_err(ClaimMevError::MaxFetchRetriesExceeded)? - .into_iter() - .filter_map(|(pubkey, maybe_account)| { - let Some(account) = maybe_account else { - datapoint_warn!( - "claim_mev_workflow-account_error", - ("epoch", merkle_trees.epoch, i64), - ("pubkey", pubkey.to_string(), String), - ("account_type", "tip_distribution_account", String), - ("error", 1, i64), - ("err_type", "fetch", String), - ("err_str", "Failed to fetch TipDistributionAccount", String) - ); - return None; - }; - - let account = match TipDistributionAccount::try_deserialize(&mut account.data.as_slice()) { - Ok(a) => a, - Err(e) => { - datapoint_warn!( - "claim_mev_workflow-account_error", - ("epoch", merkle_trees.epoch, i64), - ("pubkey", pubkey.to_string(), String), - ("account_type", "tip_distribution_account", String), - ("error", 1, i64), - ("err_type", "deserialize_tip_distribution_account", String), - ("err_str", e.to_string(), String) - ); - return None; - } - }; - Some((pubkey, account)) - }) - .collect::>(); - - // track balances and account len to make sure account is rent-exempt after transfer - let claimants = crate::get_batched_accounts( - &blockhash_rpc_client, - max_concurrent_rpc_get_reqs, - tree_nodes - .iter() - .map(|tree_node| tree_node.claimant) - .collect_vec(), - ) - .await - .map_err(ClaimMevError::MaxFetchRetriesExceeded)? - .into_iter() - .map(|(pubkey, maybe_account)| { - ( - pubkey, - maybe_account - .map(|account| (account.lamports, account.data.len())) - .unwrap_or_default(), - ) - }) - .collect::>(); - - // Refresh claimants + Try sending txns to RPC - let mut retries = 0; - let mut failed_transaction_count = 0usize; - loop { - let start = Instant::now(); - let claim_statuses = crate::get_batched_accounts( - &blockhash_rpc_client, - max_concurrent_rpc_get_reqs, - tree_nodes - .iter() - .map(|tree_node| tree_node.claim_status_pubkey) - .collect_vec(), - ) - .await - .map_err(ClaimMevError::MaxFetchRetriesExceeded)?; - let account_fetch_elapsed = start.elapsed(); - - let ( - skipped_merkle_root_count, - zero_lamports_count, - already_claimed_count, - below_min_rent_count, - transactions, - ) = build_transactions( + + let start = Instant::now(); + + let tda_pubkeys = merkle_trees + .generated_merkle_trees + .iter() + .map(|tree| tree.tip_distribution_account) + .collect_vec(); + let tdas: HashMap = crate::get_batched_accounts(rpc_client, &tda_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, a)| Some((pubkey, a?))) + .collect(); + + let claimant_pubkeys = tree_nodes + .iter() + .map(|tree_node| tree_node.claimant) + .collect_vec(); + let claimants: HashMap = + crate::get_batched_accounts(rpc_client, &claimant_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, a)| Some((pubkey, a?))) + .collect(); + + let claim_status_pubkeys = tree_nodes + .iter() + .map(|tree_node| tree_node.claim_status_pubkey) + .collect_vec(); + let claim_statuses: HashMap = + crate::get_batched_accounts(rpc_client, &claim_status_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, a)| Some((pubkey, a?))) + .collect(); + + let elapsed_us = start.elapsed().as_micros(); + + // can be helpful for determining mismatch in state between requested and read + datapoint_info!( + "claim_mev-get_claim_transactions_account_data", + ("elapsed_us", elapsed_us, i64), + ("tdas", tda_pubkeys.len(), i64), + ("tdas_onchain", tdas.len(), i64), + ("claimants", claimant_pubkeys.len(), i64), + ("claimants_onchain", claimants.len(), i64), + ("claim_statuses", claim_status_pubkeys.len(), i64), + ("claim_statuses_onchain", claim_statuses.len(), i64), + ); + + let transactions = build_mev_claim_transactions( + tip_distribution_program_id, + merkle_trees, + tdas, + claimants, + claim_statuses, + micro_lamports, + payer_pubkey, + ); + + Ok(transactions) +} + +pub async fn claim_mev_tips( + merkle_trees: &GeneratedMerkleTreeCollection, + rpc_url: String, + tip_distribution_program_id: Pubkey, + keypair: Arc, + max_loop_duration: Duration, + micro_lamports: u64, +) -> Result<(), ClaimMevError> { + let rpc_client = RpcClient::new_with_timeout_and_commitment( + rpc_url, + Duration::from_secs(300), + CommitmentConfig::confirmed(), + ); + + let start = Instant::now(); + while start.elapsed() <= max_loop_duration { + let mut all_claim_transactions = get_claim_transactions_for_valid_unclaimed( + &rpc_client, + merkle_trees, tip_distribution_program_id, - &merkle_trees, - &payer_pubkey, - &tree_nodes, - &tdas, - &claimants, - &claim_statuses, - micro_lamports_per_compute_unit, - )?; + micro_lamports, + keypair.pubkey(), + ) + .await?; + datapoint_info!( - "claim_mev_workflow-prepare_transactions", - ("epoch", merkle_trees.epoch, i64), - ("attempt", retries, i64), - ("tree_node_count", tree_nodes.len(), i64), - ("tda_count", tdas.len(), i64), - ("claimant_count", claimants.len(), i64), - ("claim_status_count", claim_statuses.len(), i64), - ("skipped_merkle_root_count", skipped_merkle_root_count, i64), - ("zero_lamports_count", zero_lamports_count, i64), - ("already_claimed_count", already_claimed_count, i64), - ("below_min_rent_count", below_min_rent_count, i64), - ("transaction_count", transactions.len(), i64), - ( - "account_fetch_latency_us", - account_fetch_elapsed.as_micros(), - i64 - ), - ( - "transaction_prepare_latency_us", - start.elapsed().as_micros(), - i64 - ), + "claim_mev_tips-send_summary", + ("claim_transactions_left", all_claim_transactions.len(), i64), ); - if transactions.is_empty() { - info!("Finished claiming tips after {retries} retries, {failed_transaction_count} failed requests."); + if all_claim_transactions.is_empty() { return Ok(()); } - if let Some((start_balance, desired_balance, sol_to_deposit)) = is_sufficient_balance( - &payer_pubkey, - &blockhash_rpc_client, - transactions.len() as u64, - ) - .await + all_claim_transactions.shuffle(&mut thread_rng()); + let transactions: Vec<_> = all_claim_transactions.into_iter().take(10_000).collect(); + + // only check balance for the ones we need to currently send since reclaim rent running in parallel + if let Some((start_balance, desired_balance, sol_to_deposit)) = + is_sufficient_balance(&keypair.pubkey(), &rpc_client, transactions.len() as u64).await { - return Err(InsufficientBalance { + return Err(ClaimMevError::InsufficientBalance { desired_balance, - payer: payer_pubkey, + payer: keypair.pubkey(), start_balance, sol_to_deposit, }); } - let transactions_len = transactions.len(); - - info!("Sending {} tip claim transactions. {zero_lamports_count} would transfer zero lamports, {below_min_rent_count} would be below minimum rent", transactions.len()); - let send_start = Instant::now(); - let (remaining_transaction_count, new_failed_transaction_count) = - sign_and_send_transactions_with_retries_multi_rpc( - &keypair, - &blockhash_rpc_client, - &rpc_clients, - transactions, - max_loop_duration, - ) - .await; - failed_transaction_count = - failed_transaction_count.saturating_add(new_failed_transaction_count); - datapoint_info!( - "claim_mev_workflow-send_transactions", - ("epoch", merkle_trees.epoch, i64), - ("attempt", retries, i64), - ("transaction_count", transactions_len, i64), - ( - "successful_transaction_count", - transactions_len.saturating_sub(remaining_transaction_count), - i64 - ), - ( - "remaining_transaction_count", - remaining_transaction_count, - i64 - ), - ( - "failed_transaction_count", - new_failed_transaction_count, - i64 - ), - ("send_latency_us", send_start.elapsed().as_micros(), i64), - ); + let blockhash = rpc_client.get_latest_blockhash().await?; + let _ = send_until_blockhash_expires(&rpc_client, transactions, blockhash, &keypair).await; + } - if retries >= max_loop_retries { - return Err(ClaimMevError::MaxSendTransactionRetriesExceeded { - attempts: max_loop_retries, - remaining_transaction_count, - failed_transaction_count, - }); + let transactions = get_claim_transactions_for_valid_unclaimed( + &rpc_client, + merkle_trees, + tip_distribution_program_id, + micro_lamports, + keypair.pubkey(), + ) + .await?; + if transactions.is_empty() { + return Ok(()); + } + + // if more transactions left, we'll simulate them all to make sure its not an uncaught error + let mut is_error = false; + let mut error_str = String::new(); + for tx in &transactions { + match rpc_client + .simulate_transaction_with_config( + tx, + RpcSimulateTransactionConfig { + sig_verify: false, + replace_recent_blockhash: true, + commitment: Some(CommitmentConfig::processed()), + ..RpcSimulateTransactionConfig::default() + }, + ) + .await + { + Ok(_) => {} + Err(e) => { + error_str = e.to_string(); + is_error = true; + + match e.get_transaction_error() { + None => { + break; + } + Some(e) => { + warn!("transaction error. tx: {:?} error: {:?}", tx, e); + break; + } + } + } } - retries = retries.saturating_add(1); + } + + if is_error { + Err(ClaimMevError::UncaughtError { e: error_str }) + } else { + Err(ClaimMevError::NotFinished { + transactions_left: transactions.len(), + }) } } -#[allow(clippy::result_large_err)] -fn build_transactions( - tip_distribution_program_id: &Pubkey, +/// Returns a list of claim transactions for valid, unclaimed MEV tips +/// A valid, unclaimed transaction consists of the following: +/// - there must be lamports to claim for the tip distribution account. +/// - there must be a merkle root. +/// - the claimant (typically a stake account) must exist. +/// - the claimant (typically a stake account) must have a non-zero amount of tips to claim +/// - the claimant must have enough lamports post-claim to be rent-exempt. +/// - note: there aren't any rent exempt accounts on solana mainnet anymore. +/// - it must not have already been claimed. +fn build_mev_claim_transactions( + tip_distribution_program_id: Pubkey, merkle_trees: &GeneratedMerkleTreeCollection, - payer_pubkey: &Pubkey, - tree_nodes: &[&TreeNode], - tdas: &HashMap, - claimants: &HashMap, - claim_statuses: &HashMap>, - micro_lamports_per_compute_unit: u64, -) -> Result< - ( - usize, /* skipped_merkle_root_count */ - usize, /* zero_lamports_count */ - usize, /* already_claimed_count */ - usize, /* below_min_rent_count */ - Vec, - ), - ClaimMevError, -> { + tdas: HashMap, + claimants: HashMap, + claim_status: HashMap, + micro_lamports: u64, + payer_pubkey: Pubkey, +) -> Vec { + let tip_distribution_accounts: HashMap = tdas + .iter() + .filter_map(|(pubkey, account)| { + Some(( + *pubkey, + TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).ok()?, + )) + }) + .collect(); + + let claim_statuses: HashMap = claim_status + .iter() + .filter_map(|(pubkey, account)| { + Some(( + *pubkey, + ClaimStatus::try_deserialize(&mut account.data.as_slice()).ok()?, + )) + }) + .collect(); + + datapoint_info!( + "build_mev_claim_transactions", + ( + "tip_distribution_accounts", + tip_distribution_accounts.len(), + i64 + ), + ("claim_statuses", claim_statuses.len(), i64), + ); + let tip_distribution_config = - Pubkey::find_program_address(&[Config::SEED], tip_distribution_program_id).0; - let mut skipped_merkle_root_count: usize = 0; - let mut zero_lamports_count: usize = 0; - let mut already_claimed_count: usize = 0; - let mut below_min_rent_count: usize = 0; - let mut instructions = - Vec::with_capacity(tree_nodes.iter().filter(|node| node.amount > 0).count()); - - // prepare instructions to transfer to all claimants + Pubkey::find_program_address(&[Config::SEED], &tip_distribution_program_id).0; + + let mut instructions = Vec::with_capacity(claimants.len()); for tree in &merkle_trees.generated_merkle_trees { - let Some(fetched_tip_distribution_account) = tdas.get(&tree.tip_distribution_account) - else { - return Err(TDANotFound(tree.tip_distribution_account)); - }; - // only claim for ones that have merkle root on-chain - if fetched_tip_distribution_account.merkle_root.is_none() { - info!( - "Merkle root has not uploaded yet. Skipped {} claimants for TDA: {:?}", - tree.tree_nodes.len(), - tree.tip_distribution_account - ); - skipped_merkle_root_count = skipped_merkle_root_count.checked_add(1).unwrap(); + if tree.max_total_claim == 0 { + continue; + } + + // if unwrap panics, there's a bug in the merkle tree code because the merkle tree code relies on the state + // of the chain to claim. + let tip_distribution_account = tip_distribution_accounts + .get(&tree.tip_distribution_account) + .unwrap(); + + // can continue here, as there might be tip distribution accounts this account doesn't upload for + if tip_distribution_account.merkle_root.is_none() { continue; } + for node in &tree.tree_nodes { - if node.amount == 0 { - zero_lamports_count = zero_lamports_count.checked_add(1).unwrap(); + // doesn't make sense to claim for claimants that don't exist anymore + // can't claim for something already claimed + // don't need to claim for claimants that get 0 MEV + if claimants.get(&node.claimant).is_none() + || claim_statuses.get(&node.claim_status_pubkey).is_some() + || node.amount == 0 + { continue; } - // make sure not previously claimed - match claim_statuses.get(&node.claim_status_pubkey) { - Some(None) => {} // expected to not find ClaimStatus account, don't skip - Some(Some(_account)) => { - debug!( - "Claim status account already exists (already paid out). Skipping pubkey: {:?}.", node.claim_status_pubkey, - ); - already_claimed_count = already_claimed_count.checked_add(1).unwrap(); - continue; - } - None => return Err(ClaimantNotFound(node.claim_status_pubkey)), - }; - let Some((current_balance, allocated_bytes)) = claimants.get(&node.claimant) else { - return Err(ClaimantNotFound(node.claimant)); - }; - - // some older accounts can be rent-paying - // any new transfers will need to make the account rent-exempt (runtime enforced) - let new_balance = current_balance.checked_add(node.amount).unwrap(); - let minimum_rent = minimum_balance(*allocated_bytes); - if new_balance < minimum_rent { - debug!("Current balance + claim amount of {new_balance} is less than required rent-exempt of {minimum_rent} for pubkey: {}. Skipping.", node.claimant); - below_min_rent_count = below_min_rent_count.checked_add(1).unwrap(); - continue; - } instructions.push(Instruction { - program_id: *tip_distribution_program_id, + program_id: tip_distribution_program_id, data: jito_tip_distribution::instruction::Claim { proof: node.proof.clone().unwrap(), amount: node.amount, @@ -391,7 +338,7 @@ fn build_transactions( tip_distribution_account: tree.tip_distribution_account, claimant: node.claimant, claim_status: node.claim_status_pubkey, - payer: *payer_pubkey, + payer: payer_pubkey, system_program: system_program::id(), } .to_account_metas(None), @@ -399,21 +346,16 @@ fn build_transactions( } } - let transactions = instructions + // TODO (LB): see if we can do >1 claim here + let transactions: Vec = instructions .into_iter() .map(|claim_ix| { - let priority_fee_ix = - ComputeBudgetInstruction::set_compute_unit_price(micro_lamports_per_compute_unit); - Transaction::new_with_payer(&[priority_fee_ix, claim_ix], Some(payer_pubkey)) + let priority_fee_ix = ComputeBudgetInstruction::set_compute_unit_price(micro_lamports); + Transaction::new_with_payer(&[priority_fee_ix, claim_ix], Some(&payer_pubkey)) }) - .collect::>(); - Ok(( - skipped_merkle_root_count, - zero_lamports_count, - already_claimed_count, - below_min_rent_count, - transactions, - )) + .collect(); + + transactions } /// heuristic to make sure we have enough funds to cover the rent costs if epoch has many validators diff --git a/tip-distributor/src/lib.rs b/tip-distributor/src/lib.rs index c914adb376..8ee6b50f5d 100644 --- a/tip-distributor/src/lib.rs +++ b/tip-distributor/src/lib.rs @@ -10,7 +10,6 @@ use { stake_meta_generator_workflow::StakeMetaGeneratorError::CheckedMathError, }, anchor_lang::Id, - itertools::Itertools, jito_tip_distribution::{ program::JitoTipDistribution, state::{ClaimStatus, TipDistributionAccount}, @@ -21,9 +20,11 @@ use { TIP_ACCOUNT_SEED_7, }, log::*, - rand::prelude::SliceRandom, serde::{de::DeserializeOwned, Deserialize, Serialize}, - solana_client::{nonblocking::rpc_client::RpcClient, rpc_client::RpcClient as SyncRpcClient}, + solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_client::{RpcClient as SyncRpcClient, SerializableTransaction}, + }, solana_merkle_tree::MerkleTree, solana_metrics::{datapoint_error, datapoint_warn}, solana_program::{ @@ -34,12 +35,14 @@ use { }, solana_rpc_client_api::{ client_error::{Error, ErrorKind}, + config::RpcSendTransactionConfig, request::{RpcError, RpcResponseErrorData, MAX_MULTIPLE_ACCOUNTS}, response::RpcSimulateTransactionResult, }, solana_sdk::{ account::{Account, AccountSharedData, ReadableAccount}, clock::Slot, + commitment_config::{CommitmentConfig, CommitmentLevel}, hash::{Hash, Hasher}, pubkey::Pubkey, signature::{Keypair, Signature}, @@ -49,18 +52,16 @@ use { TransactionError::{self}, }, }, + solana_transaction_status::TransactionStatus, std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fs::File, io::BufReader, path::PathBuf, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::Arc, time::{Duration, Instant}, }, - tokio::sync::{RwLock, Semaphore}, + tokio::{sync::Semaphore, time::sleep}, }; #[derive(Clone, Deserialize, Serialize, Debug)] @@ -478,108 +479,6 @@ pub fn derive_tip_distribution_account_address( pub const MAX_RETRIES: usize = 5; pub const FAIL_DELAY: Duration = Duration::from_millis(100); -/// Returns unprocessed transactions, along with fail count -pub async fn sign_and_send_transactions_with_retries_multi_rpc( - signer: &Arc, - blockhash_rpc_client: &Arc, - rpc_clients: &Arc>>, - mut transactions: Vec, - max_loop_duration: Duration, -) -> ( - usize, /* remaining txn count */ - usize, /* failed txn count */ -) { - let error_count = Arc::new(AtomicUsize::default()); - let blockhash = Arc::new(RwLock::new( - blockhash_rpc_client - .get_latest_blockhash() - .await - .expect("fetch latest blockhash"), - )); - let transactions_receiver = { - let (transactions_sender, transactions_receiver) = crossbeam_channel::unbounded(); - let mut rng = rand::thread_rng(); - transactions.shuffle(&mut rng); // shuffle to avoid racing for the same order of txns as other claim-tip processes - transactions - .into_iter() - .for_each(|txn| transactions_sender.send(txn).unwrap()); - transactions_receiver - }; - let blockhash_refresh_handle = { - let blockhash_rpc_client = blockhash_rpc_client.clone(); - let blockhash = blockhash.clone(); - let transactions_receiver = transactions_receiver.clone(); - tokio::spawn(async move { - let start = Instant::now(); - let mut last_blockhash_update = Instant::now(); - while start.elapsed() < max_loop_duration && !transactions_receiver.is_empty() { - // ensure we always have a recent blockhash - if last_blockhash_update.elapsed() > Duration::from_secs(2) { - let hash = blockhash_rpc_client - .get_latest_blockhash() - .await - .expect("fetch latest blockhash"); - info!( - "Got blockhash {hash:?}. Sending {} transactions to claim mev tips.", - transactions_receiver.len() - ); - *blockhash.write().await = hash; - last_blockhash_update = Instant::now(); - } - } - - info!( - "Exited blockhash refresh thread. {} transactions remain.", - transactions_receiver.len() - ); - transactions_receiver.len() - }) - }; - let send_handles = rpc_clients - .iter() - .map(|rpc_client| { - let signer = signer.clone(); - let transactions_receiver = transactions_receiver.clone(); - let rpc_client = rpc_client.clone(); - let error_count = error_count.clone(); - let blockhash = blockhash.clone(); - tokio::spawn(async move { - let mut iterations = 0usize; - while let Ok(txn) = transactions_receiver.recv() { - let mut retries = 0usize; - while retries < MAX_RETRIES { - iterations = iterations.saturating_add(1); - let (_signed_txn, res) = - signed_send(&signer, &rpc_client, *blockhash.read().await, txn.clone()) - .await; - match res { - Ok(_) => break, - Err(_) => { - retries = retries.saturating_add(1); - error_count.fetch_add(1, Ordering::Relaxed); - tokio::time::sleep(FAIL_DELAY).await; - } - } - } - } - - info!("Exited send thread. Ran {iterations} times."); - }) - }) - .collect_vec(); - - for handle in send_handles { - if let Err(e) = handle.await { - warn!("Error joining handle: {e:?}") - } - } - let remaining_transaction_count = blockhash_refresh_handle.await.unwrap(); - ( - remaining_transaction_count, - error_count.load(Ordering::Relaxed), - ) -} - pub async fn sign_and_send_transactions_with_retries( signer: &Keypair, rpc_client: &RpcClient, @@ -648,6 +547,119 @@ pub async fn sign_and_send_transactions_with_retries( (transactions_to_process.values().cloned().collect(), errors) } +pub async fn send_until_blockhash_expires( + rpc_client: &RpcClient, + transactions: Vec, + blockhash: Hash, + keypair: &Arc, +) -> solana_rpc_client_api::client_error::Result<()> { + let mut claim_transactions: HashMap = transactions + .into_iter() + .map(|mut tx| { + tx.sign(&[&keypair], blockhash); + (*tx.get_signature(), tx) + }) + .collect(); + + let txs_requesting_send = claim_transactions.len(); + + while rpc_client + .is_blockhash_valid(&blockhash, CommitmentConfig::processed()) + .await? + { + let mut check_signatures = HashSet::with_capacity(claim_transactions.len()); + let mut already_processed = HashSet::with_capacity(claim_transactions.len()); + let mut is_blockhash_not_found = false; + + for (signature, tx) in &claim_transactions { + match rpc_client + .send_transaction_with_config( + tx, + RpcSendTransactionConfig { + skip_preflight: false, + preflight_commitment: Some(CommitmentLevel::Confirmed), + max_retries: Some(2), + ..RpcSendTransactionConfig::default() + }, + ) + .await + { + Ok(_) => { + check_signatures.insert(*signature); + } + Err(e) => match e.get_transaction_error() { + Some(TransactionError::BlockhashNotFound) => { + is_blockhash_not_found = true; + break; + } + Some(TransactionError::AlreadyProcessed) => { + already_processed.insert(*tx.get_signature()); + } + Some(e) => { + warn!( + "TransactionError sending signature: {} error: {:?} tx: {:?}", + tx.get_signature(), + e, + tx + ); + } + None => { + warn!( + "Unknown error sending transaction signature: {} error: {:?}", + tx.get_signature(), + e + ); + } + }, + } + } + + sleep(Duration::from_secs(10)).await; + + let signatures: Vec = check_signatures.iter().cloned().collect(); + let statuses = get_batched_signatures_statuses(rpc_client, &signatures).await?; + + for (signature, maybe_status) in &statuses { + if let Some(_status) = maybe_status { + claim_transactions.remove(signature); + check_signatures.remove(signature); + } + } + + for signature in already_processed { + claim_transactions.remove(&signature); + } + + if claim_transactions.is_empty() || is_blockhash_not_found { + break; + } + } + + let num_landed = txs_requesting_send + .checked_sub(claim_transactions.len()) + .unwrap(); + info!("num_landed: {:?}", num_landed); + + Ok(()) +} + +pub async fn get_batched_signatures_statuses( + rpc_client: &RpcClient, + signatures: &[Signature], +) -> solana_rpc_client_api::client_error::Result)>> { + let mut signature_statuses = Vec::new(); + + for signatures_batch in signatures.chunks(100) { + // was using get_signature_statuses_with_history, but it blocks if the signatures don't exist + // bigtable calls to read signatures that don't exist block forever w/o --rpc-bigtable-timeout argument set + // get_signature_statuses looks in status_cache, which only has a 150 block history + // may have false negative, but for this workflow it doesn't matter + let statuses = rpc_client.get_signature_statuses(signatures_batch).await?; + signature_statuses.extend(signatures_batch.iter().cloned().zip(statuses.value)); + } + Ok(signature_statuses) +} + /// Just in time sign and send transaction to RPC async fn signed_send( signer: &Keypair, @@ -699,50 +711,17 @@ async fn signed_send( (txn, res) } -/// Fetch accounts in parallel batches with retries. async fn get_batched_accounts( rpc_client: &RpcClient, - max_concurrent_rpc_get_reqs: usize, - pubkeys: Vec, + pubkeys: &[Pubkey], ) -> solana_rpc_client_api::client_error::Result>> { - let semaphore = Arc::new(Semaphore::new(max_concurrent_rpc_get_reqs)); - let futs = pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS).map(|pubkeys| { - let semaphore = semaphore.clone(); - - async move { - let _permit = semaphore.acquire_owned().await.unwrap(); // wait until our turn - let mut retries = 0usize; - loop { - match rpc_client.get_multiple_accounts(pubkeys).await { - Ok(accts) => return Ok(accts), - Err(e) => { - retries = retries.saturating_add(1); - if retries == MAX_RETRIES { - datapoint_error!( - "claim_mev_workflow-get_batched_accounts_error", - ("pubkeys", format!("{pubkeys:?}"), String), - ("error", 1, i64), - ("err_type", "fetch_account", String), - ("err_str", e.to_string(), String) - ); - return Err(e); - } - tokio::time::sleep(FAIL_DELAY).await; - } - } - } - } - }); + let mut batched_accounts = HashMap::new(); - let claimant_accounts = futures::future::join_all(futs) - .await - .into_iter() - .collect::>>>>()? // fail on single error - .into_iter() - .flatten() - .collect_vec(); - - Ok(pubkeys.into_iter().zip(claimant_accounts).collect()) + for pubkeys_chunk in pubkeys.chunks(MAX_MULTIPLE_ACCOUNTS) { + let accounts = rpc_client.get_multiple_accounts(pubkeys_chunk).await?; + batched_accounts.extend(pubkeys_chunk.iter().cloned().zip(accounts)); + } + Ok(batched_accounts) } /// Calculates the minimum balance needed to be rent-exempt diff --git a/tip-distributor/src/reclaim_rent_workflow.rs b/tip-distributor/src/reclaim_rent_workflow.rs index 6775a02041..fc48c89d61 100644 --- a/tip-distributor/src/reclaim_rent_workflow.rs +++ b/tip-distributor/src/reclaim_rent_workflow.rs @@ -1,10 +1,9 @@ use { crate::{ - claim_mev_workflow::ClaimMevError, reclaim_rent_workflow::ClaimMevError::AnchorError, - sign_and_send_transactions_with_retries_multi_rpc, + claim_mev_workflow::ClaimMevError, get_batched_accounts, + reclaim_rent_workflow::ClaimMevError::AnchorError, send_until_blockhash_expires, }, anchor_lang::AccountDeserialize, - itertools::Itertools, jito_tip_distribution::{ sdk::{ derive_config_account_address, @@ -16,12 +15,14 @@ use { }, state::{ClaimStatus, Config, TipDistributionAccount}, }, - log::info, + log::{info, warn}, + rand::{prelude::SliceRandom, thread_rng}, solana_client::nonblocking::rpc_client::RpcClient, - solana_measure::measure, solana_metrics::datapoint_info, - solana_program::pubkey::Pubkey, + solana_program::{clock::Epoch, pubkey::Pubkey}, + solana_rpc_client_api::config::RpcSimulateTransactionConfig, solana_sdk::{ + account::Account, commitment_config::CommitmentConfig, compute_budget::ComputeBudgetInstruction, signature::{Keypair, Signer}, @@ -36,165 +37,228 @@ use { /// Clear old ClaimStatus accounts pub async fn reclaim_rent( rpc_url: String, - rpc_send_connection_count: u64, tip_distribution_program_id: Pubkey, signer: Arc, - max_loop_retries: u64, max_loop_duration: Duration, // Optionally reclaim TipDistributionAccount rents on behalf of validators. should_reclaim_tdas: bool, - micro_lamports_per_compute_unit: u64, + micro_lamports: u64, ) -> Result<(), ClaimMevError> { - let blockhash_rpc_client = Arc::new(RpcClient::new_with_timeout_and_commitment( + let rpc_client = RpcClient::new_with_timeout_and_commitment( rpc_url.clone(), - Duration::from_secs(180), // 3 mins - CommitmentConfig::finalized(), - )); - let rpc_clients = Arc::new( - (0..rpc_send_connection_count) - .map(|_| { - Arc::new(RpcClient::new_with_commitment( - rpc_url.clone(), - CommitmentConfig::confirmed(), - )) - }) - .collect_vec(), + Duration::from_secs(300), + CommitmentConfig::processed(), ); - let mut retries = 0; - let mut failed_transaction_count = 0usize; - let signer_pubkey = signer.pubkey(); - loop { - let (transactions, get_pa_elapsed, transaction_prepare_elaspsed) = build_transactions( - blockhash_rpc_client.clone(), - &tip_distribution_program_id, - &signer_pubkey, - should_reclaim_tdas, - micro_lamports_per_compute_unit, - ) + + let start = Instant::now(); + + let accounts = rpc_client + .get_program_accounts(&tip_distribution_program_id) .await?; + + let config_pubkey = derive_config_account_address(&tip_distribution_program_id).0; + let config_account = rpc_client.get_account(&config_pubkey).await?; + let config_account = + Config::try_deserialize(&mut config_account.data.as_slice()).map_err(AnchorError)?; + + let epoch = rpc_client.get_epoch_info().await?.epoch; + let mut claim_status_pubkeys_to_expire = + find_expired_claim_status_accounts(&accounts, epoch, signer.pubkey()); + let mut tda_pubkeys_to_expire = find_expired_tda_accounts(&accounts, epoch); + + while start.elapsed() <= max_loop_duration { + let mut transactions = build_close_claim_status_transactions( + &claim_status_pubkeys_to_expire, + tip_distribution_program_id, + config_pubkey, + micro_lamports, + signer.pubkey(), + ); + if should_reclaim_tdas { + transactions.extend(build_close_tda_transactions( + &tda_pubkeys_to_expire, + tip_distribution_program_id, + config_pubkey, + &config_account, + signer.pubkey(), + )); + } + datapoint_info!( "claim_mev_workflow-prepare_rent_reclaim_transactions", - ("attempt", retries, i64), ("transaction_count", transactions.len(), i64), - ("account_fetch_latency_us", get_pa_elapsed.as_micros(), i64), - ( - "transaction_prepare_latency_us", - transaction_prepare_elaspsed.as_micros(), - i64 - ), ); - let transactions_len = transactions.len(); + if transactions.is_empty() { - info!("Finished reclaim rent after {retries} retries, {failed_transaction_count} failed requests."); + info!("Finished reclaim rent!"); return Ok(()); } - info!("Sending {} rent reclaim transactions", transactions.len()); - let send_start = Instant::now(); - let (remaining_transaction_count, new_failed_transaction_count) = - sign_and_send_transactions_with_retries_multi_rpc( - &signer, - &blockhash_rpc_client, - &rpc_clients, - transactions, - max_loop_duration, - ) - .await; - failed_transaction_count = - failed_transaction_count.saturating_add(new_failed_transaction_count); + transactions.shuffle(&mut thread_rng()); + let transactions: Vec<_> = transactions.into_iter().take(10_000).collect(); + let blockhash = rpc_client.get_latest_blockhash().await?; + send_until_blockhash_expires(&rpc_client, transactions, blockhash, &signer).await?; - datapoint_info!( - "claim_mev_workflow-send_reclaim_rent_transactions", - ("attempt", retries, i64), - ("transaction_count", transactions_len, i64), - ( - "successful_transaction_count", - transactions_len.saturating_sub(remaining_transaction_count), - i64 - ), - ( - "remaining_transaction_count", - remaining_transaction_count, - i64 - ), - ( - "failed_transaction_count", - new_failed_transaction_count, - i64 - ), - ("send_latency_us", send_start.elapsed().as_micros(), i64), - ); + // can just refresh calling get_multiple_accounts since these operations should be subtractive and not additive + let claim_status_pubkeys: Vec<_> = claim_status_pubkeys_to_expire + .iter() + .map(|(pubkey, _)| *pubkey) + .collect(); + claim_status_pubkeys_to_expire = get_batched_accounts(&rpc_client, &claim_status_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, account)| Some((pubkey, account?))) + .collect(); - if retries >= max_loop_retries { - return Err(ClaimMevError::MaxSendTransactionRetriesExceeded { - attempts: max_loop_retries, - remaining_transaction_count, - failed_transaction_count, - }); - } - retries = retries.saturating_add(1); + let tda_pubkeys: Vec<_> = tda_pubkeys_to_expire + .iter() + .map(|(pubkey, _)| *pubkey) + .collect(); + tda_pubkeys_to_expire = get_batched_accounts(&rpc_client, &tda_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, account)| Some((pubkey, account?))) + .collect(); } -} -async fn build_transactions( - rpc_client: Arc, - tip_distribution_program_id: &Pubkey, - signer_pubkey: &Pubkey, - should_reclaim_tdas: bool, - micro_lamports_per_compute_unit: u64, -) -> Result<(Vec, Duration, Duration), ClaimMevError> { - info!("Fetching program accounts"); - let (accounts, get_pa_elapsed) = measure!( - rpc_client - .get_program_accounts(tip_distribution_program_id) - .await? - ); - info!( - "Fetch get_program_accounts took {:?} and fetched {} accounts", - get_pa_elapsed.as_duration(), - accounts.len() + // one final refresh before double checking everything + let claim_status_pubkeys: Vec<_> = claim_status_pubkeys_to_expire + .iter() + .map(|(pubkey, _)| *pubkey) + .collect(); + claim_status_pubkeys_to_expire = get_batched_accounts(&rpc_client, &claim_status_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, account)| Some((pubkey, account?))) + .collect(); + + let tda_pubkeys: Vec<_> = tda_pubkeys_to_expire + .iter() + .map(|(pubkey, _)| *pubkey) + .collect(); + tda_pubkeys_to_expire = get_batched_accounts(&rpc_client, &tda_pubkeys) + .await? + .into_iter() + .filter_map(|(pubkey, account)| Some((pubkey, account?))) + .collect(); + + let mut transactions = build_close_claim_status_transactions( + &claim_status_pubkeys_to_expire, + tip_distribution_program_id, + config_pubkey, + micro_lamports, + signer.pubkey(), ); + if should_reclaim_tdas { + transactions.extend(build_close_tda_transactions( + &tda_pubkeys_to_expire, + tip_distribution_program_id, + config_pubkey, + &config_account, + signer.pubkey(), + )); + } - info!("Fetching current_epoch"); - let current_epoch = rpc_client.get_epoch_info().await?.epoch; - info!("Fetch current_epoch: {current_epoch}"); + if transactions.is_empty() { + return Ok(()); + } - info!("Fetching Config account"); - let config_pubkey = derive_config_account_address(tip_distribution_program_id).0; - let (config_account, elapsed) = measure!(rpc_client.get_account(&config_pubkey).await?); - info!("Fetch Config account took {:?}", elapsed.as_duration()); - let config_account: Config = - Config::try_deserialize(&mut config_account.data.as_slice()).map_err(AnchorError)?; + // if more transactions left, we'll simulate them all to make sure its not an uncaught error + let mut is_error = false; + let mut error_str = String::new(); + for tx in &transactions { + match rpc_client + .simulate_transaction_with_config( + tx, + RpcSimulateTransactionConfig { + sig_verify: false, + replace_recent_blockhash: true, + commitment: Some(CommitmentConfig::processed()), + ..RpcSimulateTransactionConfig::default() + }, + ) + .await + { + Ok(_) => {} + Err(e) => { + error_str = e.to_string(); + is_error = true; + + match e.get_transaction_error() { + None => { + break; + } + Some(e) => { + warn!("transaction error. tx: {:?} error: {:?}", tx, e); + break; + } + } + } + } + } + + if is_error { + Err(ClaimMevError::UncaughtError { e: error_str }) + } else { + Err(ClaimMevError::NotFinished { + transactions_left: transactions.len(), + }) + } +} - info!("Filtering for ClaimStatus accounts"); - let claim_status_accounts: Vec<(Pubkey, ClaimStatus)> = accounts +fn find_expired_claim_status_accounts( + accounts: &[(Pubkey, Account)], + epoch: Epoch, + payer: Pubkey, +) -> Vec<(Pubkey, Account)> { + accounts .iter() .filter_map(|(pubkey, account)| { let claim_status = ClaimStatus::try_deserialize(&mut account.data.as_slice()).ok()?; - Some((*pubkey, claim_status)) + if claim_status.claim_status_payer.eq(&payer) && epoch > claim_status.expires_at { + Some((*pubkey, account.clone())) + } else { + None + } }) - .filter(|(_, claim_status): &(Pubkey, ClaimStatus)| { - // Only return claim statuses that we've paid for and ones that are expired to avoid transaction failures. - claim_status.claim_status_payer.eq(signer_pubkey) - && current_epoch > claim_status.expires_at + .collect() +} + +fn find_expired_tda_accounts( + accounts: &[(Pubkey, Account)], + epoch: Epoch, +) -> Vec<(Pubkey, Account)> { + accounts + .iter() + .filter_map(|(pubkey, account)| { + let tda = TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).ok()?; + if epoch > tda.expires_at { + Some((*pubkey, account.clone())) + } else { + None + } }) - .collect::>(); - info!( - "{} ClaimStatus accounts eligible for rent reclaim", - claim_status_accounts.len() - ); + .collect() +} - info!("Creating CloseClaimStatusAccounts transactions"); - let transaction_now = Instant::now(); - let mut transactions = claim_status_accounts - .into_iter() - .map(|(claim_status_pubkey, claim_status)| { +/// Assumes accounts is already pre-filtered with checks to ensure the account can be closed +fn build_close_claim_status_transactions( + accounts: &[(Pubkey, Account)], + tip_distribution_program_id: Pubkey, + config: Pubkey, + microlamports: u64, + payer: Pubkey, +) -> Vec { + accounts + .iter() + .map(|(claim_status_pubkey, account)| { + let claim_status = ClaimStatus::try_deserialize(&mut account.data.as_slice()).unwrap(); close_claim_status_ix( - *tip_distribution_program_id, + tip_distribution_program_id, CloseClaimStatusArgs, CloseClaimStatusAccounts { - config: config_pubkey, - claim_status: claim_status_pubkey, + config, + claim_status: *claim_status_pubkey, claim_status_payer: claim_status.claim_status_payer, }, ) @@ -203,55 +267,44 @@ async fn build_transactions( .chunks(4) .map(|close_claim_status_instructions| { let mut instructions = vec![ComputeBudgetInstruction::set_compute_unit_price( - micro_lamports_per_compute_unit, + microlamports, )]; instructions.extend(close_claim_status_instructions.to_vec()); - Transaction::new_with_payer(&instructions, Some(signer_pubkey)) + Transaction::new_with_payer(&instructions, Some(&payer)) }) - .collect::>(); - - info!( - "Create CloseClaimStatusAccounts transactions took {:?}", - transaction_now.elapsed() - ); + .collect() +} - if should_reclaim_tdas { - info!("Creating CloseTipDistributionAccounts transactions"); - let now = Instant::now(); - let close_tda_txs = accounts - .into_iter() - .filter_map(|(pubkey, account)| { - let tda = - TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).ok()?; - Some((pubkey, tda)) - }) - .filter(|(_, tda): &(Pubkey, TipDistributionAccount)| current_epoch > tda.expires_at) - .map(|(tip_distribution_account, tda)| { - close_tip_distribution_account_ix( - *tip_distribution_program_id, - CloseTipDistributionAccountArgs { - _epoch: tda.epoch_created_at, - }, - CloseTipDistributionAccounts { - config: config_pubkey, - tip_distribution_account, - validator_vote_account: tda.validator_vote_account, - expired_funds_account: config_account.expired_funds_account, - signer: *signer_pubkey, - }, - ) - }) - .collect::>() - .chunks(4) - .map(|instructions| Transaction::new_with_payer(instructions, Some(signer_pubkey))) - .collect::>(); - info!("Create CloseTipDistributionAccounts transactions took {:?}, closing {} tip distribution accounts", now.elapsed(), close_tda_txs.len()); +fn build_close_tda_transactions( + accounts: &[(Pubkey, Account)], + tip_distribution_program_id: Pubkey, + config_pubkey: Pubkey, + config: &Config, + payer: Pubkey, +) -> Vec { + let instructions: Vec<_> = accounts + .iter() + .map(|(pubkey, account)| { + let tda = + TipDistributionAccount::try_deserialize(&mut account.data.as_slice()).unwrap(); + close_tip_distribution_account_ix( + tip_distribution_program_id, + CloseTipDistributionAccountArgs { + _epoch: tda.epoch_created_at, + }, + CloseTipDistributionAccounts { + config: config_pubkey, + tip_distribution_account: *pubkey, + validator_vote_account: tda.validator_vote_account, + expired_funds_account: config.expired_funds_account, + signer: payer, + }, + ) + }) + .collect(); - transactions.extend(close_tda_txs); - } - Ok(( - transactions, - get_pa_elapsed.as_duration(), - transaction_now.elapsed(), - )) + instructions + .chunks(4) + .map(|ix_chunk| Transaction::new_with_payer(ix_chunk, Some(&payer))) + .collect() }