Skip to content

Commit

Permalink
reverts back in SocketAddr dedup in retransmit stage
Browse files Browse the repository at this point in the history
This was erronously deemed as unnecessary and removed in:
#864

The commit partially reverts #864 and adds back socket-addr dedup.
  • Loading branch information
behzadnouri committed Apr 29, 2024
1 parent 9f10f09 commit b23f3d1
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub struct ClusterNodesCache<T> {
pub struct RetransmitPeers<'a> {
root_distance: usize, // distance from the root node
children: Vec<&'a Node>,
// Maps tvu addresses to the first node
// in the shuffle with the same address.
addrs: HashMap<SocketAddr, Pubkey>, // tvu addresses
}

impl Node {
Expand Down Expand Up @@ -173,13 +176,16 @@ impl ClusterNodes<RetransmitStage> {
let RetransmitPeers {
root_distance,
children,
addrs,
} = self.get_retransmit_peers(slot_leader, shred, fanout)?;
let protocol = get_broadcast_protocol(shred);
let peers = children
.into_iter()
.filter_map(|node| node.contact_info()?.tvu(protocol).ok())
.collect();
Ok((root_distance, peers))
let peers = children.into_iter().filter_map(|node| {
node.contact_info()?
.tvu(protocol)
.ok()
.filter(|addr| addrs.get(addr) == Some(&node.pubkey()))
});
Ok((root_distance, peers.collect()))
}

pub fn get_retransmit_peers(
Expand All @@ -199,10 +205,19 @@ impl ClusterNodes<RetransmitStage> {
if let Some(index) = self.index.get(slot_leader) {
weighted_shuffle.remove_index(*index);
}
let mut addrs = HashMap::<SocketAddr, Pubkey>::with_capacity(self.nodes.len());
let mut rng = get_seeded_rng(slot_leader, shred);
let protocol = get_broadcast_protocol(shred);
let nodes: Vec<_> = weighted_shuffle
.shuffle(&mut rng)
.map(|index| &self.nodes[index])
.inspect(|node| {
if let Some(node) = node.contact_info() {
if let Ok(addr) = node.tvu(protocol) {
addrs.entry(addr).or_insert(*node.pubkey());
}
}
})
.collect();
let self_index = nodes
.iter()
Expand All @@ -221,6 +236,7 @@ impl ClusterNodes<RetransmitStage> {
Ok(RetransmitPeers {
root_distance,
children: peers.collect(),
addrs,
})
}

Expand Down

0 comments on commit b23f3d1

Please sign in to comment.