Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

persists repair-peers cache across repair service loops (backport #18400) #18483

Merged
merged 1 commit into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/cluster_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ impl ClusterSlots {
}

pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<u64> {
if repair_peers.is_empty() {
return Vec::default();
}
let stakes = {
let validator_stakes = self.validator_stakes.read().unwrap();
repair_peers
Expand Down
7 changes: 4 additions & 3 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use crate::{
repair_weight::RepairWeight,
replay_stage::DUPLICATE_THRESHOLD,
result::Result,
serve_repair::{RepairType, ServeRepair},
serve_repair::{RepairType, ServeRepair, REPAIR_PEERS_CACHE_CAPACITY},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use lru::LruCache;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{
blockstore::{Blockstore, SlotMeta},
Expand Down Expand Up @@ -193,6 +194,7 @@ impl RepairService {
let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);

loop {
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -272,14 +274,13 @@ impl RepairService {
)
};

let mut cache = HashMap::new();
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
let mut outstanding_requests = outstanding_requests.write().unwrap();
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
cluster_slots,
repair_request,
&mut cache,
&mut peers_cache,
&mut repair_stats,
&repair_info.repair_validators,
&mut outstanding_requests,
Expand Down
83 changes: 59 additions & 24 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use crate::{
result::{Error, Result},
};
use bincode::serialize;
use rand::distributions::{Distribution, WeightedIndex};
use lru::LruCache;
use rand::{
distributions::{Distribution, WeightedError, WeightedIndex},
Rng,
};
use solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
Expand All @@ -27,7 +31,7 @@ use solana_sdk::{
};
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::HashSet,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
Expand All @@ -37,6 +41,10 @@ use std::{

/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
// Number of slots to cache their respective repair peers and sampling weights.
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
// Limit cache entries ttl in order to avoid re-using outdated data.
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);

#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RepairType {
Expand Down Expand Up @@ -107,7 +115,38 @@ pub struct ServeRepair {
cluster_info: Arc<ClusterInfo>,
}

type RepairCache = HashMap<Slot, (Vec<ContactInfo>, WeightedIndex<u64>)>;
// Cache entry for repair peers for a slot.
pub(crate) struct RepairPeers {
asof: Instant,
peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>,
weighted_index: WeightedIndex<u64>,
}

impl RepairPeers {
fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result<Self> {
if peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
if peers.len() != weights.len() {
return Err(Error::from(WeightedError::InvalidWeight));
}
let weighted_index = WeightedIndex::new(weights)?;
let peers = peers
.iter()
.map(|peer| (peer.id, peer.serve_repair))
.collect();
Ok(Self {
asof,
peers,
weighted_index,
})
}

fn sample<R: Rng>(&self, rng: &mut R) -> (Pubkey, SocketAddr) {
let index = self.weighted_index.sample(rng);
self.peers[index]
}
}

impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests.
Expand Down Expand Up @@ -396,37 +435,33 @@ impl ServeRepair {
Ok(out)
}

pub fn repair_request(
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: RepairType,
cache: &mut RepairCache,
peers_cache: &mut LruCache<Slot, RepairPeers>,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingRepairs,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let slot = repair_request.slot();
let (repair_peers, weighted_index) = match cache.entry(slot) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let repair_peers = match peers_cache.get(&slot) {
Some(entry) if entry.asof.elapsed() < REPAIR_PEERS_CACHE_TTL => entry,
_ => {
peers_cache.pop(&slot);
let repair_peers = self.repair_peers(repair_validators, slot);
if repair_peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
let weights = cluster_slots.compute_weights(slot, &repair_peers);
debug_assert_eq!(weights.len(), repair_peers.len());
let weighted_index = WeightedIndex::new(weights)?;
entry.insert((repair_peers, weighted_index))
let repair_peers = RepairPeers::new(Instant::now(), &repair_peers, &weights)?;
peers_cache.put(slot, repair_peers);
peers_cache.get(&slot).unwrap()
}
};
let n = weighted_index.sample(&mut rand::thread_rng());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let (peer, addr) = repair_peers.sample(&mut rand::thread_rng());
let nonce =
outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp());
let repair_peer_id = repair_peers[n].id;
let out = self.map_repair_request(&repair_request, &repair_peer_id, repair_stats, nonce)?;
let out = self.map_repair_request(&repair_request, &peer, repair_stats, nonce)?;
Ok((addr, out))
}

Expand Down Expand Up @@ -772,7 +807,7 @@ mod tests {
let rv = serve_repair.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
Expand Down Expand Up @@ -800,7 +835,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
Expand Down Expand Up @@ -834,7 +869,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
Expand Down Expand Up @@ -1015,7 +1050,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
&mut OutstandingRepairs::default(),
Expand All @@ -1032,7 +1067,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
&mut OutstandingRepairs::default(),
Expand All @@ -1053,7 +1088,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut OutstandingRepairs::default(),
Expand Down