Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Don't forward packets received from TPU forwards port (#22078) (#22171)
Browse files Browse the repository at this point in the history
* Don't forward packets received from TPU forwards port

* Add banking stage test

(cherry picked from commit b1d9a2e)

Co-authored-by: Justin Starry <justin@solana.com>
  • Loading branch information
mergify[bot] and jstarry authored Dec 30, 2021
1 parent 9c9d3e8 commit 531f36c
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 53 deletions.
204 changes: 176 additions & 28 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,24 +398,35 @@ impl BankingStage {
data_budget: &DataBudget,
) -> std::io::Result<()> {
let packets = Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
inc_new_counter_info!("banking_stage-forwarded_packets", packets.len());
const INTERVAL_MS: u64 = 100;
const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200;
const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000;
const MAX_BYTES_BUDGET: usize = MAX_BYTES_PER_INTERVAL * 5;
data_budget.update(INTERVAL_MS, |bytes| {
std::cmp::min(bytes + MAX_BYTES_PER_INTERVAL, MAX_BYTES_BUDGET)
std::cmp::min(
bytes.saturating_add(MAX_BYTES_PER_INTERVAL),
MAX_BYTES_BUDGET,
)
});

let mut packet_vec = Vec::with_capacity(packets.len());
for p in packets {
if data_budget.take(p.meta.size) {
packet_vec.push((&p.data[..p.meta.size], tpu_forwards));
let packet_vec: Vec<_> = packets
.iter()
.filter_map(|p| {
if !p.meta.forwarded && data_budget.take(p.meta.size) {
Some((&p.data[..p.meta.size], tpu_forwards))
} else {
None
}
})
.collect();

if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec.len());
if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec)
{
return Err(ioerr);
}
}
if let Err(SendPktsError::IoError(ioerr, _num_failed)) = batch_send(socket, &packet_vec) {
return Err(ioerr);
}

Ok(())
}
Expand Down Expand Up @@ -1620,7 +1631,7 @@ mod tests {
system_transaction,
transaction::{Transaction, TransactionError},
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace},
solana_transaction_status::TransactionWithStatusMeta,
solana_vote_program::vote_transaction,
std::{
Expand Down Expand Up @@ -2877,16 +2888,15 @@ mod tests {
fn test_forwarder_budget() {
solana_logger::setup();
// Create `PacketBatch` with 1 unprocessed packet
let single_packet_batch = PacketBatch::new(vec![Packet::default()]);
let mut unprocessed_packets: UnprocessedPacketBatches =
vec![(single_packet_batch, vec![0], false)]
.into_iter()
.collect();

let cluster_info = new_test_cluster_info(Node::new_localhost().info);
let packet = Packet::from_data(None, &[0]).unwrap();
let single_packet_batch = PacketBatch::new(vec![packet]);

let genesis_config_info = create_slow_genesis_config(10_000);
let GenesisConfigInfo { genesis_config, .. } = &genesis_config_info;
let GenesisConfigInfo {
genesis_config,
validator_pubkey,
..
} = &genesis_config_info;

let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config));
let ledger_path = get_tmp_ledger_path!();
Expand All @@ -2905,17 +2915,155 @@ mod tests {
let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config));

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let data_budget = DataBudget::default();
BankingStage::handle_forwarding(
&ForwardOption::ForwardTransaction,
&cluster_info,
&mut unprocessed_packets,
&poh_recorder,
&socket,
false,
&data_budget,
let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
("budget-restricted", DataBudget::restricted(), 0),
("budget-available", DataBudget::default(), 1),
];

for (name, data_budget, expected_num_forwarded) in test_cases {
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
vec![(single_packet_batch.clone(), vec![0], false)]
.into_iter()
.collect();
BankingStage::handle_forwarding(
&ForwardOption::ForwardTransaction,
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
true,
&data_budget,
);

recv_socket
.set_nonblocking(expected_num_forwarded == 0)
.unwrap();

let mut packets = vec![Packet::default(); 2];
let (_, num_received) =
recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
assert_eq!(num_received, expected_num_forwarded, "{}", name);
}

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Blockstore::destroy(&ledger_path).unwrap();
}

#[test]
fn test_handle_forwarding() {
solana_logger::setup();

const FWD_PACKET: u8 = 1;
let forwarded_packet = {
let mut packet = Packet::from_data(None, &[FWD_PACKET]).unwrap();
packet.meta.forwarded = true;
packet
};

const NORMAL_PACKET: u8 = 2;
let normal_packet = Packet::from_data(None, &[NORMAL_PACKET]).unwrap();

let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]);
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
vec![(packet_batch, vec![0, 1], false)]
.into_iter()
.collect();

let genesis_config_info = create_slow_genesis_config(10_000);
let GenesisConfigInfo {
genesis_config,
validator_pubkey,
..
} = &genesis_config_info;
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config));
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(
Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
);
let poh_config = PohConfig {
// limit tick count to avoid clearing working_bank at
// PohRecord then PohRecorderError(MaxHeightReached) at BankingStage
target_tick_count: Some(bank.max_tick_height() - 1),
..PohConfig::default()
};

let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config));

let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let recv_socket = &local_node.sockets.tpu_forwards[0];

let test_cases = vec![
("not-forward", ForwardOption::NotForward, true, vec![], 2),
(
"fwd-normal",
ForwardOption::ForwardTransaction,
true,
vec![NORMAL_PACKET],
2,
),
(
"fwd-no-op",
ForwardOption::ForwardTransaction,
true,
vec![],
2,
),
(
"fwd-no-hold",
ForwardOption::ForwardTransaction,
false,
vec![],
0,
),
];

for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases {
BankingStage::handle_forwarding(
&forward_option,
&cluster_info,
&mut unprocessed_packet_batches,
&poh_recorder,
&send_socket,
hold,
&DataBudget::default(),
);

recv_socket
.set_nonblocking(expected_ids.is_empty())
.unwrap();

let mut packets = vec![Packet::default(); 2];
let (_, num_received) =
recv_mmsg(recv_socket, &mut packets[..]).unwrap_or_default();
assert_eq!(num_received, expected_ids.len(), "{}", name);
for (i, expected_id) in expected_ids.iter().enumerate() {
assert_eq!(packets[i].meta.size, 1);
assert_eq!(packets[i].data[0], *expected_id, "{}", name);
}

let num_unprocessed_packets: usize = unprocessed_packet_batches
.iter()
.map(|(b, ..)| b.packets.len())
.sum();
assert_eq!(
num_unprocessed_packets, expected_num_unprocessed,
"{}",
name
);
}

exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/commitment_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,11 +503,7 @@ mod tests {

let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![&validator_vote_keypairs];
let GenesisConfigInfo {
genesis_config,
mint_keypair: _,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![100; 1],
Expand Down
16 changes: 11 additions & 5 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
solana_metrics::{inc_new_counter_debug, inc_new_counter_info},
solana_perf::{packet::PacketBatchRecycler, recycler::Recycler},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::clock::DEFAULT_TICKS_PER_SLOT,
solana_sdk::{clock::DEFAULT_TICKS_PER_SLOT, packet::Packet},
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
std::{
net::UdpSocket,
Expand Down Expand Up @@ -83,10 +83,16 @@ impl FetchStage {
sendr: &PacketBatchSender,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> Result<()> {
let packet_batch = recvr.recv()?;
let mark_forwarded = |packet: &mut Packet| {
packet.meta.forwarded = true;
};

let mut packet_batch = recvr.recv()?;
let mut num_packets = packet_batch.packets.len();
packet_batch.packets.iter_mut().for_each(mark_forwarded);
let mut packet_batches = vec![packet_batch];
while let Ok(packet_batch) = recvr.try_recv() {
while let Ok(mut packet_batch) = recvr.try_recv() {
packet_batch.packets.iter_mut().for_each(mark_forwarded);
num_packets += packet_batch.packets.len();
packet_batches.push(packet_batch);
// Read at most 1K transactions in a loop
Expand Down Expand Up @@ -115,7 +121,7 @@ impl FetchStage {
}

fn new_multi_socket(
sockets: Vec<Arc<UdpSocket>>,
tpu_sockets: Vec<Arc<UdpSocket>>,
tpu_forwards_sockets: Vec<Arc<UdpSocket>>,
tpu_vote_sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
Expand All @@ -126,7 +132,7 @@ impl FetchStage {
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);

let tpu_threads = sockets.into_iter().map(|socket| {
let tpu_threads = tpu_sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
exit,
Expand Down
2 changes: 1 addition & 1 deletion core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl ShredFetchStage {
recycler.clone(),
bank_forks.clone(),
"shred_fetch_tvu_forwards",
|p| p.meta.forward = true,
|p| p.meta.forwarded = true,
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
Expand Down
2 changes: 1 addition & 1 deletion core/src/vote_simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ pub fn initialize_state(
let GenesisConfigInfo {
mut genesis_config,
mint_keypair,
voting_keypair: _,
..
} = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
Expand Down
8 changes: 8 additions & 0 deletions perf/src/data_budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ pub struct DataBudget {
}

impl DataBudget {
/// Create a data budget with max bytes, used for tests
pub fn restricted() -> Self {
Self {
bytes: AtomicUsize::default(),
last_timestamp_ms: AtomicU64::new(u64::MAX),
}
}

// If there are enough bytes in the budget, consumes from
// the budget and returns true. Otherwise returns false.
#[must_use]
Expand Down
1 change: 1 addition & 0 deletions program-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ impl ProgramTest {
genesis_config,
mint_keypair,
voting_keypair,
validator_pubkey: bootstrap_validator_pubkey,
},
)
}
Expand Down
6 changes: 1 addition & 5 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ mod test {
&validator_vote_keypairs1,
&validator_vote_keypairs2,
];
let GenesisConfigInfo {
genesis_config,
mint_keypair: _,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![10_000; 3],
Expand Down
1 change: 1 addition & 0 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6311,6 +6311,7 @@ pub mod tests {
mut genesis_config,
mint_keypair,
voting_keypair,
..
} = create_genesis_config(TEST_MINT_LAMPORTS);

genesis_config.rent.lamports_per_byte_year = 50;
Expand Down
6 changes: 1 addition & 5 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14315,11 +14315,7 @@ pub(crate) mod tests {
let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![&validator_vote_keypairs0, &validator_vote_keypairs1];
let GenesisConfigInfo {
genesis_config,
mint_keypair: _,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![10_000; 2],
Expand Down
2 changes: 1 addition & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ mod tests {
let leader_keypair = Keypair::new();
let GenesisConfigInfo {
mut genesis_config,
mint_keypair: _,
voting_keypair,
..
} = create_genesis_config_with_leader(10_000, &leader_keypair.pubkey(), 1_000);
let slots_in_epoch = 32;
genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch);
Expand Down
Loading

0 comments on commit 531f36c

Please sign in to comment.