diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0ff5b9f87de845..4aff9a1629f2ba 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -403,7 +403,10 @@ impl BankingStage { const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5; data_budget.update(INTERVAL_MS, |bytes| { - std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET) + std::cmp::min( + bytes.saturating_add(MAX_BYTES_PER_INTERVAL), + MAX_BYTES_BUDGET, + ) }); let packet_vec: Vec<_> = packets @@ -1622,7 +1625,7 @@ mod tests { system_transaction, transaction::{Transaction, TransactionError}, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::TransactionWithStatusMeta, solana_vote_program::vote_transaction, std::{ @@ -2879,16 +2882,15 @@ mod tests { fn test_forwarder_budget() { solana_logger::setup(); // Create `PacketBatch` with 1 unprocessed packet - let single_packet_batch = PacketBatch::new(vec![Packet::default()]); - let mut unprocessed_packets: UnprocessedPacketBatches = - vec![(single_packet_batch, vec![0], false)] - .into_iter() - .collect(); - - let cluster_info = new_test_cluster_info(Node::new_localhost().info); + let packet = Packet::from_data(None, &[0]).unwrap(); + let single_packet_batch = PacketBatch::new(vec![packet]); let genesis_config_info = create_slow_genesis_config(10_000); - let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info; + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); let ledger_path = get_tmp_ledger_path!(); @@ -2907,17 +2909,155 @@ mod tests { let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank, &blockstore, Some(poh_config)); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let data_budget = DataBudget::default(); - BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, - &cluster_info, - &mut unprocessed_packets, - &poh_recorder, - &socket, - false, - &data_budget, + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("budget-restricted", DataBudget::restricted(), 0), + ("budget-available", DataBudget::default(), 1), + ]; + + for (name, data_budget, expected_num_forwarded) in test_cases { + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(single_packet_batch.clone(), vec![0], false)] + .into_iter() + .collect(); + BankingStage::handle_forwarding( + &ForwardOption::ForwardTransaction, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + true, + &data_budget, + ); + + recv_socket + .set_nonblocking(expected_num_forwarded == 0) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_num_forwarded, "{}", name); + } + + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + + #[test] + fn test_handle_forwarding() { + solana_logger::setup(); + + const FWD_PACKET: u8 = 1; + let forwarded_packet = { + let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap(); + packet.meta.forwarded = true; + packet + }; + + const NORMAL_PACKET: u8 = 2; + let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap(); + + let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); + let mut unprocessed_packet_batches: UnprocessedPacketBatches = + vec![(packet_batch, vec![0, 1], false)] + .into_iter() + .collect(); + + let genesis_config_info = create_slow_genesis_config(10_000); + let GenesisConfigInfo { + genesis_config, + validator_pubkey, + .. + } = &genesis_config_info; + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Arc::new( + Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"), ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at + // PohRecord then PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config)); + + let local_node = Node::new_localhost_with_pubkey(validator_pubkey); + let cluster_info = new_test_cluster_info(local_node.info); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let recv_socket = &local_node.sockets.tpu_forwards[0]; + + let test_cases = vec![ + ("not-forward", ForwardOption::NotForward, true, vec![], 2), + ( + "fwd-normal", + ForwardOption::ForwardTransaction, + true, + vec![NORMAL_PACKET], + 2, + ), + ( + "fwd-no-op", + ForwardOption::ForwardTransaction, + true, + vec![], + 2, + ), + ( + "fwd-no-hold", + ForwardOption::ForwardTransaction, + false, + vec![], + 0, + ), + ]; + + for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + BankingStage::handle_forwarding( + &forward_option, + &cluster_info, + &mut unprocessed_packet_batches, + &poh_recorder, + &send_socket, + hold, + &DataBudget::default(), + ); + + recv_socket + .set_nonblocking(expected_ids.is_empty()) + .unwrap(); + + let mut packets = vec![Packet::default(); 2]; + let (_, num_received) = + recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default(); + assert_eq!(num_received, expected_ids.len(), "{}", name); + for (i, expected_id) in expected_ids.iter().enumerate() { + assert_eq!(packets[i].meta.size, 1); + assert_eq!(packets[i].data[0], *expected_id, "{}", name); + } + + let num_unprocessed_packets: usize = unprocessed_packet_batches + .iter() + .map(|(b, ..)| b.packets.len()) + .sum(); + assert_eq!( + num_unprocessed_packets, expected_num_unprocessed, + "{}", + name + ); + } + exit.store(true, Ordering::Relaxed); poh_service.join().unwrap(); } diff --git a/core/src/commitment_service.rs b/core/src/commitment_service.rs index fe425fa2cf2cd6..dcdc769f7883b3 100644 --- a/core/src/commitment_service.rs +++ b/core/src/commitment_service.rs @@ -503,11 +503,7 @@ mod tests { let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![100; 1], diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index ee89cb9f51ddf7..1e78a0ddc447b8 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -121,7 +121,7 @@ impl FetchStage { } fn new_multi_socket( - sockets: Vec>, + tpu_sockets: Vec>, tpu_forwards_sockets: Vec>, tpu_vote_sockets: Vec>, exit: &Arc, @@ -132,7 +132,7 @@ impl FetchStage { ) -> Self { let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024); - let tpu_threads = sockets.into_iter().map(|socket| { + let tpu_threads = tpu_sockets.into_iter().map(|socket| { streamer::receiver( socket, exit, diff --git a/core/src/vote_simulator.rs b/core/src/vote_simulator.rs index 0c9e4ca7403346..c3cb9286a92bb7 100644 --- a/core/src/vote_simulator.rs +++ b/core/src/vote_simulator.rs @@ -343,7 +343,7 @@ pub fn initialize_state( let GenesisConfigInfo { mut genesis_config, mint_keypair, - voting_keypair: _, + .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, diff --git a/perf/src/data_budget.rs b/perf/src/data_budget.rs index 24eb0bb84ec5cc..4c35fc6ce35caa 100644 --- a/perf/src/data_budget.rs +++ b/perf/src/data_budget.rs @@ -10,6 +10,14 @@ pub struct DataBudget { } impl DataBudget { + /// Create a data budget with max bytes, used for tests + pub fn restricted() -> Self { + Self { + bytes: AtomicUsize::default(), + last_timestamp_ms: AtomicU64::new(u64::MAX), + } + } + // If there are enough bytes in the budget, consumes from // the budget and returns true. Otherwise returns false. #[must_use] diff --git a/program-test/src/lib.rs b/program-test/src/lib.rs index a335c39fee0125..9a2f02a4929cbe 100644 --- a/program-test/src/lib.rs +++ b/program-test/src/lib.rs @@ -812,6 +812,7 @@ impl ProgramTest { genesis_config, mint_keypair, voting_keypair, + validator_pubkey: bootstrap_validator_pubkey, }, ) } diff --git a/rpc/src/cluster_tpu_info.rs b/rpc/src/cluster_tpu_info.rs index cf567bd5a482f1..5421bc7b2b9b3d 100644 --- a/rpc/src/cluster_tpu_info.rs +++ b/rpc/src/cluster_tpu_info.rs @@ -91,11 +91,7 @@ mod test { &validator_vote_keypairs1, &validator_vote_keypairs2, ]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 3], diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 64bcd421f829f2..c6874795221b22 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -6275,6 +6275,7 @@ pub mod tests { mut genesis_config, mint_keypair, voting_keypair, + .. } = create_genesis_config(TEST_MINT_LAMPORTS); genesis_config.rent.lamports_per_byte_year = 50; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d7e6f243efc8c3..1b85c424f15b27 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -14306,11 +14306,7 @@ pub(crate) mod tests { let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand(); let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand(); let validator_keypairs = vec![&validator_vote_keypairs0, &validator_vote_keypairs1]; - let GenesisConfigInfo { - genesis_config, - mint_keypair: _, - voting_keypair: _, - } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 1_000_000_000, &validator_keypairs, vec![10_000; 2], diff --git a/runtime/src/bank_forks.rs b/runtime/src/bank_forks.rs index a0b119b6307d76..a34b1777281ef5 100644 --- a/runtime/src/bank_forks.rs +++ b/runtime/src/bank_forks.rs @@ -601,8 +601,8 @@ mod tests { let leader_keypair = Keypair::new(); let GenesisConfigInfo { mut genesis_config, - mint_keypair: _, voting_keypair, + .. } = create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1_000); let slots_in_epoch = 32; genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); diff --git a/runtime/src/genesis_utils.rs b/runtime/src/genesis_utils.rs index 71222fdf441b31..7db7a4f8a07cbd 100644 --- a/runtime/src/genesis_utils.rs +++ b/runtime/src/genesis_utils.rs @@ -52,6 +52,7 @@ pub struct GenesisConfigInfo { pub genesis_config: GenesisConfig, pub mint_keypair: Keypair, pub voting_keypair: Keypair, + pub validator_pubkey: Pubkey, } pub fn create_genesis_config(mint_lamports: u64) -> GenesisConfigInfo { @@ -84,10 +85,11 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( let voting_keypair = Keypair::from_bytes(&voting_keypairs[0].borrow().vote_keypair.to_bytes()).unwrap(); + let validator_pubkey = voting_keypairs[0].borrow().node_keypair.pubkey(); let genesis_config = create_genesis_config_with_leader_ex( mint_lamports, &mint_keypair.pubkey(), - &voting_keypairs[0].borrow().node_keypair.pubkey(), + &validator_pubkey, &voting_keypairs[0].borrow().vote_keypair.pubkey(), &voting_keypairs[0].borrow().stake_keypair.pubkey(), stakes[0], @@ -102,6 +104,7 @@ pub fn create_genesis_config_with_vote_accounts_and_cluster_type( genesis_config, mint_keypair, voting_keypair, + validator_pubkey, }; for (validator_voting_keypairs, stake) in voting_keypairs[1..].iter().zip(&stakes[1..]) { @@ -159,6 +162,7 @@ pub fn create_genesis_config_with_leader( genesis_config, mint_keypair, voting_keypair, + validator_pubkey: *validator_pubkey, } }