Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Don't use pinned memory when unnecessary (#17832)
Browse files Browse the repository at this point in the history
Reports of excessive GPU memory usage and errors
from cudaHostRegister. There are some cases where pinning is
not required.

(cherry picked from commit eeee75c)
  • Loading branch information
sakridge authored and mergify-bot committed Jun 14, 2021
1 parent 3b813db commit 9e1b3c2
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 7 deletions.
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn main() -> Result<()> {
recycler.clone(),
"bench-streamer-test",
1,
true,
));
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl FetchStage {
recycler.clone(),
"fetch_stage",
coalesce_ms,
true,
)
});

Expand All @@ -125,6 +126,7 @@ impl FetchStage {
recycler.clone(),
"fetch_forward_stage",
coalesce_ms,
true,
)
});

Expand Down
6 changes: 3 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ impl ServeRepair {

if let Some(packet) = packet {
inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
return Some(Packets::new_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_window_request",
vec![packet],
Expand Down Expand Up @@ -555,7 +555,7 @@ impl ServeRepair {
from_addr,
nonce,
)?;
return Some(Packets::new_with_recycler_data(
return Some(Packets::new_unpinned_with_recycler_data(
recycler,
"run_highest_window_request",
vec![packet],
Expand All @@ -572,7 +572,7 @@ impl ServeRepair {
max_responses: usize,
nonce: Nonce,
) -> Option<Packets> {
let mut res = Packets::new_with_recycler(recycler.clone(), 64, "run_orphan");
let mut res = Packets::new_unpinned_with_recycler(recycler.clone(), 64, "run_orphan");
if let Some(blockstore) = blockstore {
// Try to find the next "n" parent slots of the input slot
while let Ok(Some(meta)) = blockstore.meta(slot) {
Expand Down
1 change: 1 addition & 0 deletions core/src/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl ServeRepairService {
Recycler::default(),
"serve_repair_receiver",
1,
false,
);
let (response_sender, response_receiver) = channel();
let t_responder =
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl ShredFetchStage {
recycler.clone(),
"packet_modifier",
1,
true,
)
})
.collect();
Expand Down
5 changes: 3 additions & 2 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2051,7 +2051,8 @@ impl ClusterInfo {
.process_pull_requests(callers.cloned(), timestamp());
let output_size_limit =
self.update_data_budget(stakes.len()) / PULL_RESPONSE_MIN_SERIALIZED_SIZE;
let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let mut packets =
Packets::new_unpinned_with_recycler(recycler.clone(), 64, "handle_pull_requests");
let (caller_and_filters, addrs): (Vec<_>, Vec<_>) = {
let mut rng = rand::thread_rng();
let check_pull_request =
Expand Down Expand Up @@ -2323,7 +2324,7 @@ impl ClusterInfo {
None
} else {
let packets =
Packets::new_with_recycler_data(recycler, "handle_ping_messages", packets);
Packets::new_unpinned_with_recycler_data(recycler, "handle_ping_messages", packets);
Some(packets)
}
}
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl GossipService {
Recycler::default(),
"gossip_receiver",
1,
false,
);
let (response_sender, response_receiver) = channel();
let (consume_sender, listen_receiver) = channel();
Expand Down
4 changes: 4 additions & 0 deletions perf/src/cuda_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a
}

impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn reserve(&mut self, size: usize) {
self.x.reserve(size);
}

pub fn reserve_and_pin(&mut self, size: usize) {
if self.x.capacity() < size {
if self.pinned {
Expand Down
23 changes: 22 additions & 1 deletion perf/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,22 @@ impl Packets {
Packets { packets }
}

pub fn new_unpinned_with_recycler(
recycler: PacketsRecycler,
size: usize,
name: &'static str,
) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve(size);
Packets { packets }
}

pub fn new_with_recycler(recycler: PacketsRecycler, size: usize, name: &'static str) -> Self {
let mut packets = recycler.allocate(name);
packets.reserve_and_pin(size);
Packets { packets }
}

pub fn new_with_recycler_data(
recycler: &PacketsRecycler,
name: &'static str,
Expand All @@ -43,6 +54,16 @@ impl Packets {
vec
}

pub fn new_unpinned_with_recycler_data(
recycler: &PacketsRecycler,
name: &'static str,
mut packets: Vec<Packet>,
) -> Self {
let mut vec = Self::new_unpinned_with_recycler(recycler.clone(), packets.len(), name);
vec.packets.append(&mut packets);
vec
}

pub fn set_addr(&mut self, addr: &SocketAddr) {
for m in self.packets.iter_mut() {
m.meta.set_addr(&addr);
Expand Down Expand Up @@ -76,7 +97,7 @@ pub fn to_packets_with_destination<T: Serialize>(
recycler: PacketsRecycler,
dests_and_data: &[(SocketAddr, T)],
) -> Packets {
let mut out = Packets::new_with_recycler(
let mut out = Packets::new_unpinned_with_recycler(
recycler,
dests_and_data.len(),
"to_packets_with_destination",
Expand Down
10 changes: 9 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,18 @@ fn recv_loop(
recycler: &PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> Result<()> {
let mut recv_count = 0;
let mut call_count = 0;
let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received
loop {
let mut msgs = Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name);
let mut msgs = if use_pinned_memory {
Packets::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
} else {
Packets::with_capacity(PACKETS_PER_BATCH)
};
loop {
// Check for exit signal, even if socket is busy
// (for instance the leader transaction socket)
Expand Down Expand Up @@ -84,6 +89,7 @@ pub fn receiver(
recycler: PacketsRecycler,
name: &'static str,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() {
Expand All @@ -100,6 +106,7 @@ pub fn receiver(
&recycler.clone(),
name,
coalesce_ms,
use_pinned_memory,
);
})
.unwrap()
Expand Down Expand Up @@ -211,6 +218,7 @@ mod test {
Recycler::default(),
"test",
1,
true,
);
let t_responder = {
let (s_responder, r_responder) = channel();
Expand Down

0 comments on commit 9e1b3c2

Please sign in to comment.