Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
persists repair-peers cache across repair service loops (#18400) (#18483
Browse files Browse the repository at this point in the history
)

The repair-peers cache is reset each time repair service loop runs,
and so computed repeatedly for the same slots:
https://github.com/solana-labs/solana/blob/d2b07dca9/core/src/repair_service.rs#L275

This commit uses an LRU cache to persists repair-peers for each slot.
In addition to LRU eviction rules, in order to avoid re-using outdated
data, each entry also has 10 seconds TTL.

(cherry picked from commit a0551b4)

Co-authored-by: behzad nouri <behzadnouri@gmail.com>
  • Loading branch information
mergify[bot] and behzadnouri authored Jul 7, 2021
1 parent c534c92 commit 9bb482e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 27 deletions.
3 changes: 3 additions & 0 deletions core/src/cluster_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ impl ClusterSlots {
}

pub fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<u64> {
if repair_peers.is_empty() {
return Vec::default();
}
let stakes = {
let validator_stakes = self.validator_stakes.read().unwrap();
repair_peers
Expand Down
7 changes: 4 additions & 3 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use crate::{
repair_weight::RepairWeight,
replay_stage::DUPLICATE_THRESHOLD,
result::Result,
serve_repair::{RepairType, ServeRepair},
serve_repair::{RepairType, ServeRepair, REPAIR_PEERS_CACHE_CAPACITY},
};
use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender};
use lru::LruCache;
use solana_gossip::cluster_info::ClusterInfo;
use solana_ledger::{
blockstore::{Blockstore, SlotMeta},
Expand Down Expand Up @@ -193,6 +194,7 @@ impl RepairService {
let mut last_stats = Instant::now();
let duplicate_slot_repair_statuses: HashMap<Slot, DuplicateSlotRepairStatus> =
HashMap::new();
let mut peers_cache = LruCache::new(REPAIR_PEERS_CACHE_CAPACITY);

loop {
if exit.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -272,14 +274,13 @@ impl RepairService {
)
};

let mut cache = HashMap::new();
let mut send_repairs_elapsed = Measure::start("send_repairs_elapsed");
let mut outstanding_requests = outstanding_requests.write().unwrap();
repairs.into_iter().for_each(|repair_request| {
if let Ok((to, req)) = serve_repair.repair_request(
cluster_slots,
repair_request,
&mut cache,
&mut peers_cache,
&mut repair_stats,
&repair_info.repair_validators,
&mut outstanding_requests,
Expand Down
83 changes: 59 additions & 24 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use crate::{
result::{Error, Result},
};
use bincode::serialize;
use rand::distributions::{Distribution, WeightedIndex};
use lru::LruCache;
use rand::{
distributions::{Distribution, WeightedError, WeightedIndex},
Rng,
};
use solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
Expand All @@ -27,7 +31,7 @@ use solana_sdk::{
};
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
collections::HashSet,
net::SocketAddr,
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
Expand All @@ -37,6 +41,10 @@ use std::{

/// the number of slots to respond with when responding to `Orphan` requests
pub const MAX_ORPHAN_REPAIR_RESPONSES: usize = 10;
// Number of slots to cache their respective repair peers and sampling weights.
pub(crate) const REPAIR_PEERS_CACHE_CAPACITY: usize = 128;
// Limit cache entries ttl in order to avoid re-using outdated data.
const REPAIR_PEERS_CACHE_TTL: Duration = Duration::from_secs(10);

#[derive(Serialize, Deserialize, Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RepairType {
Expand Down Expand Up @@ -107,7 +115,38 @@ pub struct ServeRepair {
cluster_info: Arc<ClusterInfo>,
}

type RepairCache = HashMap<Slot, (Vec<ContactInfo>, WeightedIndex<u64>)>;
// Cache entry for repair peers for a slot.
pub(crate) struct RepairPeers {
asof: Instant,
peers: Vec<(Pubkey, /*ContactInfo.serve_repair:*/ SocketAddr)>,
weighted_index: WeightedIndex<u64>,
}

impl RepairPeers {
fn new(asof: Instant, peers: &[ContactInfo], weights: &[u64]) -> Result<Self> {
if peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
if peers.len() != weights.len() {
return Err(Error::from(WeightedError::InvalidWeight));
}
let weighted_index = WeightedIndex::new(weights)?;
let peers = peers
.iter()
.map(|peer| (peer.id, peer.serve_repair))
.collect();
Ok(Self {
asof,
peers,
weighted_index,
})
}

fn sample<R: Rng>(&self, rng: &mut R) -> (Pubkey, SocketAddr) {
let index = self.weighted_index.sample(rng);
self.peers[index]
}
}

impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests.
Expand Down Expand Up @@ -396,37 +435,33 @@ impl ServeRepair {
Ok(out)
}

pub fn repair_request(
pub(crate) fn repair_request(
&self,
cluster_slots: &ClusterSlots,
repair_request: RepairType,
cache: &mut RepairCache,
peers_cache: &mut LruCache<Slot, RepairPeers>,
repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
outstanding_requests: &mut OutstandingRepairs,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let slot = repair_request.slot();
let (repair_peers, weighted_index) = match cache.entry(slot) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let repair_peers = match peers_cache.get(&slot) {
Some(entry) if entry.asof.elapsed() < REPAIR_PEERS_CACHE_TTL => entry,
_ => {
peers_cache.pop(&slot);
let repair_peers = self.repair_peers(repair_validators, slot);
if repair_peers.is_empty() {
return Err(Error::from(ClusterInfoError::NoPeers));
}
let weights = cluster_slots.compute_weights(slot, &repair_peers);
debug_assert_eq!(weights.len(), repair_peers.len());
let weighted_index = WeightedIndex::new(weights)?;
entry.insert((repair_peers, weighted_index))
let repair_peers = RepairPeers::new(Instant::now(), &repair_peers, &weights)?;
peers_cache.put(slot, repair_peers);
peers_cache.get(&slot).unwrap()
}
};
let n = weighted_index.sample(&mut rand::thread_rng());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let (peer, addr) = repair_peers.sample(&mut rand::thread_rng());
let nonce =
outstanding_requests.add_request(repair_request, solana_sdk::timing::timestamp());
let repair_peer_id = repair_peers[n].id;
let out = self.map_repair_request(&repair_request, &repair_peer_id, repair_stats, nonce)?;
let out = self.map_repair_request(&repair_request, &peer, repair_stats, nonce)?;
Ok((addr, out))
}

Expand Down Expand Up @@ -772,7 +807,7 @@ mod tests {
let rv = serve_repair.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
Expand Down Expand Up @@ -800,7 +835,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
Expand Down Expand Up @@ -834,7 +869,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut outstanding_requests,
Expand Down Expand Up @@ -1015,7 +1050,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
&mut OutstandingRepairs::default(),
Expand All @@ -1032,7 +1067,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&trusted_validators,
&mut OutstandingRepairs::default(),
Expand All @@ -1053,7 +1088,7 @@ mod tests {
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut LruCache::new(100),
&mut RepairStats::default(),
&None,
&mut OutstandingRepairs::default(),
Expand Down

0 comments on commit 9bb482e

Please sign in to comment.