Skip to content

Commit

Permalink
removes redundant ClusterInfo::drain_push_queue (solana-labs#33753)
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri authored Oct 18, 2023
1 parent 2465abc commit afd044e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 33 deletions.
40 changes: 17 additions & 23 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,7 @@ impl ClusterInfo {
CrdsData::LowestSlot(0, LowestSlot::new(self_pubkey, min, now)),
&self.keypair(),
);
self.local_message_pending_push_queue
.lock()
.unwrap()
.push(entry);
self.push_message(entry);
}
}

Expand Down Expand Up @@ -973,7 +970,7 @@ impl ClusterInfo {
TimedGuard::new(self.gossip.crds.read().unwrap(), label, counter)
}

pub fn push_message(&self, message: CrdsValue) {
fn push_message(&self, message: CrdsValue) {
self.local_message_pending_push_queue
.lock()
.unwrap()
Expand Down Expand Up @@ -1515,25 +1512,23 @@ impl ClusterInfo {
(pings, pulls.collect())
}

fn drain_push_queue(&self) -> Vec<CrdsValue> {
let mut push_queue = self.local_message_pending_push_queue.lock().unwrap();
std::mem::take(&mut *push_queue)
}
// Used in tests
pub fn flush_push_queue(&self) {
let pending_push_messages = self.drain_push_queue();
let mut gossip_crds = self.gossip.crds.write().unwrap();
let now = timestamp();
for entry in pending_push_messages {
let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage);
let entries: Vec<CrdsValue> =
std::mem::take(&mut *self.local_message_pending_push_queue.lock().unwrap());
if !entries.is_empty() {
let mut gossip_crds = self.gossip.crds.write().unwrap();
let now = timestamp();
for entry in entries {
let _ = gossip_crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let (mut push_messages, num_entries, num_nodes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.gossip
.new_push_messages(&self_id, self.drain_push_queue(), timestamp(), stakes)
self.flush_push_queue();
self.gossip.new_push_messages(&self_id, timestamp(), stakes)
};
self.stats
.push_fanout_num_entries
Expand Down Expand Up @@ -3596,12 +3591,11 @@ mod tests {
&SocketAddrSpace::Unspecified,
);
//check that all types of gossip messages are signed correctly
let (push_messages, _, _) = cluster_info.gossip.new_push_messages(
&cluster_info.id(),
cluster_info.drain_push_queue(),
timestamp(),
&stakes,
);
cluster_info.flush_push_queue();
let (push_messages, _, _) =
cluster_info
.gossip
.new_push_messages(&cluster_info.id(), timestamp(), &stakes);
// there should be some pushes ready
assert!(!push_messages.is_empty());
push_messages
Expand Down
7 changes: 0 additions & 7 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,13 @@ impl CrdsGossip {
pub fn new_push_messages(
&self,
pubkey: &Pubkey, // This node.
pending_push_messages: Vec<CrdsValue>,
now: u64,
stakes: &HashMap<Pubkey, u64>,
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
{
let mut crds = self.crds.write().unwrap();
for entry in pending_push_messages {
let _ = crds.insert(entry, now, GossipRoute::LocalMessage);
}
}
self.push.new_push_messages(pubkey, &self.crds, now, stakes)
}

Expand Down
4 changes: 1 addition & 3 deletions gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,7 @@ fn network_run_push(
node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
(
node_pubkey,
node.gossip
.new_push_messages(&node_pubkey, vec![], now, &stakes)
.0,
node.gossip.new_push_messages(&node_pubkey, now, &stakes).0,
)
})
.collect();
Expand Down

0 comments on commit afd044e

Please sign in to comment.