Skip to content

Commit

Permalink
redueces allocations when processing gossip push-messages
Browse files Browse the repository at this point in the history
  • Loading branch information
behzadnouri committed Feb 10, 2025
1 parent 8962d95 commit 4a7266d
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 149 deletions.
225 changes: 110 additions & 115 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use {
weighted_shuffle::WeightedShuffle,
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
itertools::Itertools,
itertools::{Either, Itertools},
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_ledger::shred::Shred,
Expand Down Expand Up @@ -89,6 +89,7 @@ use {
num::NonZeroUsize,
ops::{Deref, Div},
path::{Path, PathBuf},
rc::Rc,
result::Result,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -252,24 +253,11 @@ impl ClusterInfo {
&mut pings,
&self.socket_addr_space,
);
self.stats
.new_pull_requests_pings_count
.add_relaxed(pings.len() as u64);
let pings: Vec<_> = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)))
.collect();
if !pings.is_empty() {
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(pings.len() as u64);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"refresh_push_active_set",
&pings,
);
let _ = sender.send(packet_batch);
}
send_gossip_ping_messages(&pings, recycler, sender, &self.stats);
}

// TODO kill insert_info, only used by tests
Expand Down Expand Up @@ -1217,16 +1205,12 @@ impl ClusterInfo {
pulls.push((entrypoint, filters));
}

#[allow(clippy::type_complexity)]
fn new_pull_requests(
&self,
thread_pool: &ThreadPool,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
) -> (
Vec<(SocketAddr, Ping)>, // Ping packets.
Vec<(SocketAddr, Protocol)>, // Pull requests
) {
) -> impl Iterator<Item = (SocketAddr, Protocol)> {
let now = timestamp();
let self_info = CrdsValue::new(CrdsData::from(self.my_contact_info()), &self.keypair());
let max_bloom_filter_bytes = get_max_bloom_filter_bytes(&self_info);
Expand All @@ -1249,23 +1233,18 @@ impl ClusterInfo {
.unwrap_or_default()
};
self.append_entrypoint_to_pulls(thread_pool, max_bloom_filter_bytes, &mut pulls);
let num_requests = pulls
.iter()
.map(|(_, filters)| filters.len())
.sum::<usize>() as u64;
self.stats.new_pull_requests_count.add_relaxed(num_requests);
let pulls = pulls
let pings = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)));
pulls
.into_iter()
.filter_map(|(peer, filters)| Some((peer.gossip()?, filters)))
.flat_map(|(addr, filters)| repeat(addr).zip(filters))
.map(|(gossip_addr, filter)| {
.map(move |(gossip_addr, filter)| {
let request = Protocol::PullRequest(filter, self_info.clone());
(gossip_addr, request)
});
self.stats
.new_pull_requests_pings_count
.add_relaxed(pings.len() as u64);
(pings, pulls.collect())
})
.chain(pings)
}

pub fn flush_push_queue(&self) {
Expand All @@ -1279,7 +1258,10 @@ impl ClusterInfo {
}
}
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
fn new_push_requests(
&self,
stakes: &HashMap<Pubkey, u64>,
) -> impl Iterator<Item = (SocketAddr, Protocol)> {
let self_id = self.id();
let (entries, push_messages, num_pushes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
Expand Down Expand Up @@ -1308,18 +1290,16 @@ impl ClusterInfo {
})
.collect()
};
let messages: Vec<_> = push_messages
let entries = Rc::new(entries);
push_messages
.into_iter()
.flat_map(|(peer, msgs)| {
let msgs = msgs.into_iter().map(|k| entries[k].clone());
split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs)
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
.flat_map(move |(peer, msgs): (SocketAddr, Vec<usize>)| {
let entries = Rc::clone(&entries);
let msgs = msgs.into_iter().map(move |k| entries[k].clone());
let msgs = split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs)
.map(move |msgs| Protocol::PushMessage(self_id, msgs));
repeat(peer).zip(msgs)
})
.collect();
self.stats
.new_push_requests_num
.add_relaxed(messages.len() as u64);
messages
}

// Generate new push and pull requests
Expand All @@ -1329,29 +1309,19 @@ impl ClusterInfo {
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
generate_pull_requests: bool,
) -> Vec<(SocketAddr, Protocol)> {
) -> impl Iterator<Item = (SocketAddr, Protocol)> {
self.trim_crds_table(CRDS_UNIQUE_PUBKEY_CAPACITY, stakes);
// This will flush local pending push messages before generating
// pull-request bloom filters, preventing pull responses to return the
// same values back to the node itself. Note that packets will arrive
// and are processed out of order.
let mut out: Vec<_> = self.new_push_requests(stakes);
self.stats
.packets_sent_push_messages_count
.add_relaxed(out.len() as u64);
let out = self.new_push_requests(stakes);
if generate_pull_requests {
let (pings, pull_requests) =
self.new_pull_requests(thread_pool, gossip_validators, stakes);
self.stats
.packets_sent_pull_requests_count
.add_relaxed(pull_requests.len() as u64);
let pings = pings
.into_iter()
.map(|(addr, ping)| (addr, Protocol::PingMessage(ping)));
out.extend(pull_requests);
out.extend(pings);
let pulls = self.new_pull_requests(thread_pool, gossip_validators, stakes);
Either::Right(out.chain(pulls))
} else {
Either::Left(out)
}
out
}

/// At random pick a node and try to get updated changes from them
Expand All @@ -1365,21 +1335,22 @@ impl ClusterInfo {
generate_pull_requests: bool,
) -> Result<(), GossipError> {
let _st = ScopedTimer::from(&self.stats.gossip_transmit_loop_time);
let reqs = self.generate_new_gossip_requests(
let mut packet_batch = PacketBatch::new_unpinned_with_recycler(recycler, 0, "run_gossip");
self.generate_new_gossip_requests(
thread_pool,
gossip_validators,
stakes,
generate_pull_requests,
);
if !reqs.is_empty() {
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"run_gossip",
&reqs,
);
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(packet_batch.len() as u64);
)
.filter(|(addr, _)| self.socket_addr_space.check(addr))
.for_each(|(addr, data)| match Packet::from_data(Some(&addr), &data) {
Err(err) => error!("failed to write gossip msg: {err:?}"),
Ok(packet) => {
packet_batch.push(packet);
self.stats.record_sent_packet(&data);
}
});
if !packet_batch.is_empty() {
sender.send(packet_batch)?;
}
self.stats
Expand Down Expand Up @@ -1942,35 +1913,23 @@ impl ClusterInfo {
};
// Generate prune messages.
let prune_messages = self.generate_prune_messages(thread_pool, origins, stakes);
self.stats
.packets_sent_prune_messages_count
.add_relaxed(prune_messages.len() as u64);
let mut packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"handle_batch_push_messages",
&prune_messages,
);
let num_prune_packets = packet_batch.len();
self.stats
.push_response_count
.add_relaxed(packet_batch.len() as u64);
let new_push_requests = self.new_push_requests(stakes);
self.stats
.push_message_pushes
.add_relaxed(new_push_requests.len() as u64);
for (address, request) in new_push_requests {
if ContactInfo::is_valid_address(&address, &self.socket_addr_space) {
match Packet::from_data(Some(&address), &request) {
Ok(packet) => packet_batch.push(packet),
Err(err) => error!("failed to write push-request packet: {:?}", err),
self.new_push_requests(stakes)
.filter(|(addr, _)| ContactInfo::is_valid_address(addr, &self.socket_addr_space))
.for_each(|(addr, data)| match Packet::from_data(Some(&addr), &data) {
Err(err) => error!("failed to write push msg: {err:?}"),
Ok(packet) => {
packet_batch.push(packet);
self.stats.record_sent_packet(&data);
}
} else {
trace!("Dropping Gossip push response, as destination is unknown");
}
}
self.stats
.packets_sent_prune_messages_count
.add_relaxed(num_prune_packets as u64);
self.stats
.packets_sent_push_messages_count
.add_relaxed((packet_batch.len() - num_prune_packets) as u64);
});
if !packet_batch.is_empty() {
let _ = response_sender.send(packet_batch);
}
Expand Down Expand Up @@ -2147,17 +2106,7 @@ impl ClusterInfo {
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
}
}
if !pings.is_empty() {
self.stats
.packets_sent_gossip_requests_count
.add_relaxed(pings.len() as u64);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"ping_contact_infos",
&pings,
);
let _ = response_sender.send(packet_batch);
}
send_gossip_ping_messages(&pings, recycler, response_sender, &self.stats);
self.handle_batch_ping_messages(ping_messages, recycler, response_sender);
self.handle_batch_prune_messages(prune_messages, stakes);
self.handle_batch_push_messages(
Expand Down Expand Up @@ -3086,6 +3035,28 @@ fn verify_gossip_addr<R: Rng + CryptoRng>(
out
}

fn send_gossip_ping_messages(
pings: &[(SocketAddr, Protocol /*::PingMessage*/)],
recycler: &PacketBatchRecycler,
sender: &PacketBatchSender,
stats: &GossipStats,
) {
debug_assert!(pings
.iter()
.all(|(_, ping)| matches!(ping, Protocol::PingMessage(_))));
if !pings.is_empty() {
stats
.packets_sent_ping_messages_count
.add_relaxed(pings.len() as u64);
let packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"send_gossip_ping_messages",
pings,
);
let _ = sender.send(packet_batch);
}
}

#[cfg(test)]
mod tests {
use {
Expand Down Expand Up @@ -3113,6 +3084,30 @@ mod tests {
const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize =
unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) };

impl ClusterInfo {
// Wrapper for ClusterInfo.new_pull_requests replicating old return
// type for legacy tests.
#[allow(clippy::type_complexity)]
fn old_new_pull_requests(
&self,
thread_pool: &ThreadPool,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
) -> (
Vec<(SocketAddr, Ping)>, // Ping packets
Vec<(SocketAddr, Protocol)>, // Pull requests
) {
self.new_pull_requests(thread_pool, gossip_validators, stakes)
.partition_map(|(addr, protocol)| {
if let Protocol::PingMessage(ping) = protocol {
Either::Left((addr, ping))
} else {
Either::Right((addr, protocol))
}
})
}
}

#[test]
fn test_gossip_node() {
//check that a gossip nodes always show up as spies
Expand Down Expand Up @@ -3279,18 +3274,16 @@ mod tests {
&mut Vec::new(), // pings
&SocketAddrSpace::Unspecified,
);
let reqs = cluster_info.generate_new_gossip_requests(
let mut reqs = cluster_info.generate_new_gossip_requests(
&thread_pool,
None, // gossip_validators
&HashMap::new(), // stakes
true, // generate_pull_requests
);
//assert none of the addrs are invalid.
reqs.iter().all(|(addr, _)| {
let res = ContactInfo::is_valid_address(addr, &SocketAddrSpace::Unspecified);
assert!(res);
res
});
assert!(reqs.all(|(addr, _)| {
ContactInfo::is_valid_address(&addr, &SocketAddrSpace::Unspecified)
}));
}

#[test]
Expand Down Expand Up @@ -3775,7 +3768,8 @@ mod tests {
let entrypoint_pubkey = solana_pubkey::new_rand();
let entrypoint = ContactInfo::new_localhost(&entrypoint_pubkey, timestamp());
cluster_info.set_entrypoint(entrypoint.clone());
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
let (pings, pulls) =
cluster_info.old_new_pull_requests(&thread_pool, None, &HashMap::new());
assert!(pings.is_empty());
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
for (addr, msg) in pulls {
Expand All @@ -3798,7 +3792,8 @@ mod tests {
Duration::from_millis(cluster_info.gossip.pull.crds_timeout),
);
cluster_info.handle_pull_response(vec![entrypoint_crdsvalue], &timeouts);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &HashMap::new());
let (pings, pulls) =
cluster_info.old_new_pull_requests(&thread_pool, None, &HashMap::new());
assert_eq!(pings.len(), 1);
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
assert_eq!(*cluster_info.entrypoints.read().unwrap(), vec![entrypoint]);
Expand Down Expand Up @@ -3873,7 +3868,7 @@ mod tests {

// Pull request 1: `other_node` is present but `entrypoint` was just added (so it has a
// fresh timestamp). There should only be one pull request to `other_node`
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
assert!(pulls
Expand All @@ -3883,7 +3878,7 @@ mod tests {
// Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should
// now be two pull requests
cluster_info.entrypoints.write().unwrap()[0].set_wallclock(0);
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(pulls.len(), 2 * MIN_NUM_BLOOM_FILTERS);
for node in [&other_node, &entrypoint] {
Expand All @@ -3897,7 +3892,7 @@ mod tests {
}
// Pull request 3: `other_node` is present and `entrypoint` was just pulled from. There should
// only be one pull request to `other_node`
let (pings, pulls) = cluster_info.new_pull_requests(&thread_pool, None, &stakes);
let (pings, pulls) = cluster_info.old_new_pull_requests(&thread_pool, None, &stakes);
assert!(pings.is_empty());
assert_eq!(pulls.len(), MIN_NUM_BLOOM_FILTERS);
assert!(pulls
Expand Down
Loading

0 comments on commit 4a7266d

Please sign in to comment.