From f98c5be70737916e7ac207bd802b50c840fd5710 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 29 Jan 2020 20:50:26 -0800 Subject: [PATCH 1/2] Add shred version filters to Crds Accessors --- core/src/cluster_info.rs | 40 ++++++++++++++++++++++++++++++++++++-- core/src/gossip_service.rs | 4 ++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 556937e647e1bf..2a96a3d86c339e 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -413,6 +413,7 @@ impl ClusterInfo { .values() .filter_map(|x| x.value.contact_info()) .filter(|x| x.id != me.id) + .filter(|x| x.shred_version == me.shred_version) .filter(|x| ContactInfo::is_valid_address(&x.rpc)) .cloned() .collect() @@ -440,12 +441,29 @@ impl ClusterInfo { .values() .filter_map(|x| x.value.contact_info()) .filter(|x| x.id != me) + /* shred_version not considered for gossip peers (ie, spy nodes do not set + shred_version) */ .filter(|x| ContactInfo::is_valid_address(&x.gossip)) .cloned() .collect() } - /// all validators that have a valid tvu port. + /// all validators that have a valid tvu port regardless of `shred_version`. + pub fn all_tvu_peers(&self) -> Vec { + let me = self.my_data(); + self.gossip + .crds + .table + .values() + .filter_map(|x| x.value.contact_info()) + .filter(|x| ContactInfo::is_valid_address(&x.tvu)) + .filter(|x| !ClusterInfo::is_archiver(x)) + .filter(|x| x.id != me.id) + .cloned() + .collect() + } + + /// all validators that have a valid tvu port and are on the same `shred_version`. pub fn tvu_peers(&self) -> Vec { let me = self.my_data(); self.gossip @@ -456,11 +474,26 @@ impl ClusterInfo { .filter(|x| ContactInfo::is_valid_address(&x.tvu)) .filter(|x| !ClusterInfo::is_archiver(x)) .filter(|x| x.id != me.id) + .filter(|x| x.shred_version == me.shred_version) + .cloned() + .collect() + } + + /// all peers that have a valid storage addr regardless of `shred_version`. + pub fn all_storage_peers(&self) -> Vec { + let me = self.my_data(); + self.gossip + .crds + .table + .values() + .filter_map(|x| x.value.contact_info()) + .filter(|x| ContactInfo::is_valid_address(&x.storage_addr)) + .filter(|x| x.id != me.id) .cloned() .collect() } - /// all peers that have a valid storage addr + /// all peers that have a valid storage addr and are on the same `shred_version`. pub fn storage_peers(&self) -> Vec { let me = self.my_data(); self.gossip @@ -470,6 +503,7 @@ impl ClusterInfo { .filter_map(|x| x.value.contact_info()) .filter(|x| ContactInfo::is_valid_address(&x.storage_addr)) .filter(|x| x.id != me.id) + .filter(|x| x.shred_version == me.shred_version) .cloned() .collect() } @@ -483,6 +517,7 @@ impl ClusterInfo { .values() .filter_map(|x| x.value.contact_info()) .filter(|x| x.id != me.id) + .filter(|x| x.shred_version == me.shred_version) .filter(|x| ContactInfo::is_valid_address(&x.tvu)) .filter(|x| ContactInfo::is_valid_address(&x.tvu_forwards)) .cloned() @@ -495,6 +530,7 @@ impl ClusterInfo { ClusterInfo::tvu_peers(self) .into_iter() .filter(|x| x.id != me.id) + .filter(|x| x.shred_version == me.shred_version) .filter(|x| ContactInfo::is_valid_address(&x.gossip)) .filter(|x| { self.get_epoch_state_for_node(&x.id, None) diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 119c28c09370b4..374fba896989a8 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -197,10 +197,10 @@ fn spy( tvu_peers = spy_ref .read() .unwrap() - .tvu_peers() + .all_tvu_peers() .into_iter() .collect::>(); - archivers = spy_ref.read().unwrap().storage_peers(); + archivers = spy_ref.read().unwrap().all_storage_peers(); if let Some(num) = num_nodes { if tvu_peers.len() + archivers.len() >= num { if let Some(gossip_addr) = find_node_by_gossip_addr { From dee208a83c774dc3f413e5dc66b0d26f5c288252 Mon Sep 17 00:00:00 2001 From: Sagar Dhawan Date: Wed, 29 Jan 2020 23:04:52 -0800 Subject: [PATCH 2/2] Adopt entrypoint shred_version if one isn't provided --- archiver-lib/src/archiver.rs | 2 ++ core/src/cluster_info.rs | 25 ++++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index d58db51615ff56..ee7e24a8459082 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -522,6 +522,8 @@ impl Archiver { let mut contact_info = node_info.clone(); contact_info.tvu = "0.0.0.0:0".parse().unwrap(); contact_info.wallclock = timestamp(); + // copy over the adopted shred_version from the entrypoint + contact_info.shred_version = cluster_info.read().unwrap().my_data().shred_version; { let mut cluster_info_w = cluster_info.write().unwrap(); cluster_info_w.insert_self(contact_info); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 2a96a3d86c339e..02d829afad8e8f 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -413,7 +413,8 @@ impl ClusterInfo { .values() .filter_map(|x| x.value.contact_info()) .filter(|x| x.id != me.id) - .filter(|x| x.shred_version == me.shred_version) + /* shred_version not considered for rpc peers (ie, caller must select version + if desired) */ .filter(|x| ContactInfo::is_valid_address(&x.rpc)) .cloned() .collect() @@ -1093,6 +1094,7 @@ impl ClusterInfo { .spawn(move || { let mut last_push = timestamp(); let mut last_contact_info_trace = timestamp(); + let mut adopt_shred_version = obj.read().unwrap().my_data().shred_version == 0; let recycler = PacketsRecycler::default(); loop { let start = timestamp(); @@ -1133,6 +1135,27 @@ impl ClusterInfo { ("tabel_size", table_size as i64, i64), ("purge_stake_timeout", timeout as i64, i64) ); + // Adopt the entrypoint's `shred_version` if ours is unset + if adopt_shred_version { + // If gossip was given an entrypoint, lookup its id + let entrypoint_id = obj.read().unwrap().entrypoint.as_ref().map(|e| e.id); + if let Some(entrypoint_id) = entrypoint_id { + info!("Shred version unknown, looking for the entrypoint:{:?} Shred version", entrypoint_id); + // If a pull from the entrypoint was successful, it should exist in the crds table + let entrypoint = obj.read().unwrap().lookup(&entrypoint_id).cloned(); + if let Some(entrypoint) = entrypoint { + let mut self_info = obj.read().unwrap().my_data(); + if entrypoint.shred_version == 0 { + warn!("entrypoint is running an invalid shred_version: 0"); + } else { + info!("Setting Shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id); + self_info.shred_version = entrypoint.shred_version; + obj.write().unwrap().insert_self(self_info); + adopt_shred_version = false; + } + } + } + } //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 {