diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 5d225560718def..63f762a2fbf0c6 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -39,7 +39,12 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { - let id = pubkey::new_rand(); + // This ensures that cluster_info.id() is the root of turbine + // retransmit tree and so the shreds are retransmited to all other + // nodes in the cluster. + let id = std::iter::repeat_with(pubkey::new_rand) + .find(|pk| cluster_info.id() < *pk) + .unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); contact_info.tvu = socket.local_addr().unwrap(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 137d0873aebf34..499a8f28aebdb1 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -428,7 +428,9 @@ fn retransmit( // neighborhood), then we expect that the packet arrives at tvu socket // as opposed to tvu-forwards. If this is not the case, then the // turbine broadcast/retransmit tree is mismatched across nodes. - if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) { + let anchor_node = my_index % DATA_PLANE_FANOUT == 0; + if packet.meta.forward == anchor_node { + // TODO: Consider forwarding the packet to the root node here. retransmit_tree_mismatch += 1; } peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); @@ -464,10 +466,19 @@ fn retransmit( .or_default() += 1; let mut retransmit_time = Measure::start("retransmit_to"); - if !packet.meta.forward { - ClusterInfo::retransmit_to(&neighbors, packet, sock, true); + // If the node is on the critical path (i.e. the first node in each + // neighborhood), it should send the packet to tvu socket of its + // children and also tvu_forward socket of its neighbors. Otherwise it + // should only forward to tvu_forward socket of its children. + if anchor_node { + ClusterInfo::retransmit_to(&neighbors, packet, sock, /*forward socket=*/ true); } - ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward); + ClusterInfo::retransmit_to( + &children, + packet, + sock, + !anchor_node, // send to forward socket! + ); retransmit_time.stop(); retransmit_total += retransmit_time.as_us(); } @@ -726,8 +737,13 @@ mod tests { .unwrap() .local_addr() .unwrap(); - - let other = ContactInfo::new_localhost(&solana_sdk::pubkey::new_rand(), 0); + // This fixes the order of nodes returned by shuffle_peers_and_index, + // and makes turbine retransmit tree deterministic for the purpose of + // the test. + let other = std::iter::repeat_with(solana_sdk::pubkey::new_rand) + .find(|pk| me.id < *pk) + .unwrap(); + let other = ContactInfo::new_localhost(&other, 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(other); cluster_info.insert_info(me);