Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Don't use pinned memory for gossip and repair responses #17832

Merged
merged 1 commit into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn main() -> Result<()> {
recycler.clone(),
"bench-streamer-test",
1,
true,
));
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl FetchStage {
recycler.clone(),
"fetch_stage",
coalesce_ms,
true,
)
});

Expand All @@ -125,6 +126,7 @@ impl FetchStage {
recycler.clone(),
"fetch_forward_stage",
coalesce_ms,
true,
)
});

Expand Down
6 changes: 3 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ impl ServeRepair {

if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
Expand Down Expand Up @@ -555,7 +555,7 @@ impl ServeRepair {
from_addr,
nonce,
)?;
return Some(Packets::new_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
Expand All @@ -572,7 +572,7 @@ impl ServeRepair {
max_responses: usize,
nonce: Nonce,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
Expand Down
1 change: 1 addition & 0 deletions core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl ServeRepairService {
Recycler::default(),
"serve_repair_receiver",
1,
false,
);
let (response_sender, response_receiver) = channel();
let t_responder =
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl ShredFetchStage {
recycler.clone(),
"packet_modifier",
1,
true,
)
})
.collect();
Expand Down
5 changes: 3 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2041,7 +2041,8 @@ impl ClusterInfo {
.process_pull_requests(callers.cloned(), timestamp());
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let mut packets =
Packets::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng();
let check_pull_request =
Expand Down Expand Up @@ -2313,7 +2314,7 @@ impl ClusterInfo {
None
} else {
let packets =
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
Packets::new_unpinned_with_recycler_data(recycler, "handle_ping_messages", packets);
Some(packets)
}
}
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl GossipService {
Recycler::default(),
"gossip_receiver",
1,
false,
sakridge marked this conversation as resolved.
Show resolved Hide resolved
);
let (response_sender, response_receiver) = channel();
let t_responder = streamer::responder("gossip", gossip_socket, response_receiver);
Expand Down
4 changes: 4 additions & 0 deletions perf/src/cuda_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a
}

impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn reserve(&mut self, size: usize) {
self.x.reserve(size);
}

pub fn reserve_and_pin(&mut self, size: usize) {
if self.x.capacity() < size {
if self.pinned {
Expand Down
23 changes: 22 additions & 1 deletion perf/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,22 @@ impl Packets {
Packets { packets }
}

pub fn new_unpinned_with_recycler(
recycler: PacketsRecycler,
size: usize,
name: &'static str,
) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve(size);
Packets { packets }
}

pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve_and_pin(size);
Packets { packets }
}

pub fn new_with_recycler_data(
recycler: &PacketsRecycler,
name: &'static str,
Expand All @@ -43,6 +54,16 @@ impl Packets {
vec
}

pub fn new_unpinned_with_recycler_data(
recycler: &PacketsRecycler,
name: &'static str,
mut packets: Vec<Packet>,
) -> Self {
let mut vec = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name);
vec.packets.append(&mut packets);
vec
}

pub fn set_addr(&mut self, addr: &SocketAddr) {
for m in self.packets.iter_mut() {
m.meta.set_addr(&addr);
Expand Down Expand Up @@ -76,7 +97,7 @@ pub fn to_packets_with_destination<T: Serialize>(
recycler: PacketsRecycler,
dests_and_data: &[(SocketAddr, T)],
) -> Packets {
let mut out = Packets::new_with_recycler(
let mut out = Packets::new_unpinned_with_recycler(
recycler,
dests_and_data.len(),
"to_packets_with_destination",
Expand Down
10 changes: 9 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ fn recv_loop(
recycler: &PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> Result<()> {
let mut recv_count = 0;
let mut call_count = 0;
let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received
loop {
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
let mut msgs = if use_pinned_memory {
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
} else {
Packets::with_capacity(PACKETS_PER_BATCH)
};
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader transaction socket)
Expand Down Expand Up @@ -84,6 +89,7 @@ pub fn receiver(
recycler: PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
Expand All @@ -100,6 +106,7 @@ pub fn receiver(
&recycler.clone(),
name,
coalesce_ms,
use_pinned_memory,
);
})
.unwrap()
Expand Down Expand Up @@ -211,6 +218,7 @@ mod test {
Recycler::default(),
"test",
1,
true,
);
let t_responder = {
let (s_responder, r_responder) = channel();
Expand Down