Skip to content

Commit

Permalink
Add shred version filters to Crds Accessors (#8027)
Browse files Browse the repository at this point in the history
* Add shred version filters to Crds Accessors

* Adopt entrypoint shred_version if one isn't provided
  • Loading branch information
sagar-solana authored Jan 30, 2020
1 parent c2baf7b commit 64c42e2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
2 changes: 2 additions & 0 deletions archiver-lib/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ impl Archiver {
let mut contact_info = node_info.clone();
contact_info.tvu = "0.0.0.0:0".parse().unwrap();
contact_info.wallclock = timestamp();
// copy over the adopted shred_version from the entrypoint
contact_info.shred_version = cluster_info.read().unwrap().my_data().shred_version;
{
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.insert_self(contact_info);
Expand Down
63 changes: 61 additions & 2 deletions core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me.id)
/* shred_version not considered for rpc peers (ie, caller must select version
if desired) */
.filter(|x| ContactInfo::is_valid_address(&x.rpc))
.cloned()
.collect()
Expand Down Expand Up @@ -440,12 +442,29 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me)
/* shred_version not considered for gossip peers (ie, spy nodes do not set
shred_version) */
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
.cloned()
.collect()
}

/// all validators that have a valid tvu port.
/// all validators that have a valid tvu port regardless of `shred_version`.
pub fn all_tvu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| !ClusterInfo::is_archiver(x))
.filter(|x| x.id != me.id)
.cloned()
.collect()
}

/// all validators that have a valid tvu port and are on the same `shred_version`.
pub fn tvu_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
Expand All @@ -456,11 +475,26 @@ impl ClusterInfo {
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| !ClusterInfo::is_archiver(x))
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.cloned()
.collect()
}

/// all peers that have a valid storage addr
/// all peers that have a valid storage addr regardless of `shred_version`.
pub fn all_storage_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
.crds
.table
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.storage_addr))
.filter(|x| x.id != me.id)
.cloned()
.collect()
}

/// all peers that have a valid storage addr and are on the same `shred_version`.
pub fn storage_peers(&self) -> Vec<ContactInfo> {
let me = self.my_data();
self.gossip
Expand All @@ -470,6 +504,7 @@ impl ClusterInfo {
.filter_map(|x| x.value.contact_info())
.filter(|x| ContactInfo::is_valid_address(&x.storage_addr))
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.cloned()
.collect()
}
Expand All @@ -483,6 +518,7 @@ impl ClusterInfo {
.values()
.filter_map(|x| x.value.contact_info())
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
.filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards))
.cloned()
Expand All @@ -495,6 +531,7 @@ impl ClusterInfo {
ClusterInfo::tvu_peers(self)
.into_iter()
.filter(|x| x.id != me.id)
.filter(|x| x.shred_version == me.shred_version)
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
.filter(|x| {
self.get_epoch_state_for_node(&x.id, None)
Expand Down Expand Up @@ -1057,6 +1094,7 @@ impl ClusterInfo {
.spawn(move || {
let mut last_push = timestamp();
let mut last_contact_info_trace = timestamp();
let mut adopt_shred_version = obj.read().unwrap().my_data().shred_version == 0;
let recycler = PacketsRecycler::default();
loop {
let start = timestamp();
Expand Down Expand Up @@ -1097,6 +1135,27 @@ impl ClusterInfo {
("tabel_size", table_size as i64, i64),
("purge_stake_timeout", timeout as i64, i64)
);
// Adopt the entrypoint's `shred_version` if ours is unset
if adopt_shred_version {
// If gossip was given an entrypoint, lookup its id
let entrypoint_id = obj.read().unwrap().entrypoint.as_ref().map(|e| e.id);
if let Some(entrypoint_id) = entrypoint_id {
info!("Shred version unknown, looking for the entrypoint:{:?} Shred version", entrypoint_id);
// If a pull from the entrypoint was successful, it should exist in the crds table
let entrypoint = obj.read().unwrap().lookup(&entrypoint_id).cloned();
if let Some(entrypoint) = entrypoint {
let mut self_info = obj.read().unwrap().my_data();
if entrypoint.shred_version == 0 {
warn!("entrypoint is running an invalid shred_version: 0");
} else {
info!("Setting Shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id);
self_info.shred_version = entrypoint.shred_version;
obj.write().unwrap().insert_self(self_info);
adopt_shred_version = false;
}
}
}
}
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {
Expand Down
4 changes: 2 additions & 2 deletions core/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,10 @@ fn spy(
tvu_peers = spy_ref
.read()
.unwrap()
.tvu_peers()
.all_tvu_peers()
.into_iter()
.collect::<Vec<_>>();
archivers = spy_ref.read().unwrap().storage_peers();
archivers = spy_ref.read().unwrap().all_storage_peers();
if let Some(num) = num_nodes {
if tvu_peers.len() + archivers.len() >= num {
if let Some(gossip_addr) = find_node_by_gossip_addr {
Expand Down

0 comments on commit 64c42e2

Please sign in to comment.