From d1093d39d4a20f2299a8dee78089da78fcb1bee6 Mon Sep 17 00:00:00 2001 From: "OEM Configuration (temporary user)" Date: Mon, 18 Jun 2018 23:50:41 -0700 Subject: [PATCH 1/3] added remote table to update respones --- Cargo.toml | 1 + src/choose_gossip_peer_strategy.rs | 323 +++++++++++++++++++++++++++++ src/crdt.rs | 113 ++++++++-- src/lib.rs | 1 + tests/data_replicator.rs | 102 +++++++++ 5 files changed, 520 insertions(+), 20 deletions(-) create mode 100644 src/choose_gossip_peer_strategy.rs diff --git a/Cargo.toml b/Cargo.toml index fcce844bafc717..a39a8a1581c3ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ unstable = [] ipv6 = [] cuda = [] erasure = [] +gossip_choose_weighted_peer = [] [dependencies] rayon = "1.0.0" diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs new file mode 100644 index 00000000000000..20a3bd0b48c98b --- /dev/null +++ b/src/choose_gossip_peer_strategy.rs @@ -0,0 +1,323 @@ +use crdt::ReplicatedData; +use rand::thread_rng; +use rand::distributions::{IndependentSample, Weighted, WeightedChoice}; +use result::{Error, Result}; +use signature::PublicKey; +use std; +use std::collections::HashMap; + +pub const DEFAULT_WEIGHT: u32 = 1; + +pub trait ChooseGossipPeerStrategy { + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> + Result; +} + +pub struct ChooseRandomPeerStrategy<'a> { + random: &'a Fn() -> u64, +} + +impl<'a> ChooseRandomPeerStrategy<'a> { + pub fn new(random: &'a Fn() -> u64,) -> Self { + ChooseRandomPeerStrategy { random } + } +} + +impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { + if options.len() < 1 { + return Err(Error::CrdtTooSmall); + } + + let n = ((self.random)() as usize) % options.len(); + Ok(options[n].clone()) + } +} + +pub struct ChooseWeightedPeerStrategy<'a> { + remote: &'a HashMap, + external_liveness: &'a HashMap>, + get_stake: &'a Fn(PublicKey) -> f64, +} + +impl<'a> ChooseWeightedPeerStrategy<'a> { + pub fn new( + remote: &'a HashMap, + external_liveness: &'a HashMap>, + get_stake: &'a Fn(PublicKey) -> f64, + ) -> Self + { + ChooseWeightedPeerStrategy { remote, external_liveness, get_stake } + } + + fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 { + let mut last_seen_index = 0; + // If the peer is not in our remote table, then we leave last_seen_index as zero. + // Only happens when a peer appears in our crdt.table but not in our crdt.remote, + // which means a validator was directly injected into our crdt.table + if let Some(index) = self.remote.get(&peer_id) { + last_seen_index = *index; + } + + let liveness_entry = self.external_liveness.get(&peer_id); + if liveness_entry.is_none(){ + return DEFAULT_WEIGHT; + } + + let votes = liveness_entry.unwrap(); + + if votes.is_empty() { + return DEFAULT_WEIGHT; + } + + // Calculate the weighted average of the rumors + let mut relevant_votes = vec![]; + + let total_stake = votes.iter().fold( + 0.0, + |total_stake, (&id, &vote)| { + let stake = (self.get_stake)(id); + // If the total stake is going to overflow u64, pick + // the larger of either the current total_stake, or the + // new stake, this way we are guaranteed to get at least u64/2 + // sample of stake in our weighted calculation + if std::f64::MAX - total_stake < stake { + if stake > total_stake { + relevant_votes = vec![(stake, vote)]; + stake + } else { + total_stake + } + } else { + relevant_votes.push((stake, vote)); + total_stake + stake + } + } + ); + + let weighted_vote = relevant_votes.iter().fold( + 0.0, + |sum, &(stake, vote)| { + if vote < last_seen_index { + // This should never happen b/c we maintain the invariant that the indexes + // in the external_liveness table are always greater than the corresponding + // indexes in the remote table, if the index exists in the remote table at all. + + // Case 1: Attempt to insert bigger index into the "external_liveness" table + // happens after an insertion into the "remote" table. In this case, + // (see apply_updates()) function, we prevent the insertion if the entry + // in the remote table >= the atempted insertion into the "external" liveness + // table. + + // Case 2: Bigger index in the "external_liveness" table inserted before + // a smaller insertion into the "remote" table. We clear the corresponding + // "external_liveness" table entry on all insertions into the "remote" table + // See apply_updates() function. + + warn!("weighted peer index was smaller than local entry in remote table"); + return sum; + } + + let vote_difference = (vote - last_seen_index) as f64; + let new_weight = vote_difference * (stake / total_stake); + + if std::f64::MAX - sum < new_weight { + return f64::max(new_weight, sum); + } + + sum + new_weight + }, + ); + + // Return u32 b/c the weighted sampling API from rand::distributions + // only takes u32 for weights + if weighted_vote >= std::u32::MAX as f64 { + return std::u32::MAX; + } + + // If the weighted rumors we've heard about aren't any greater than + // what we've directly learned from the last time we communicated with the + // peer (i.e. weighted_vote == 0), then return a weight of 1. + // Otherwise, return the calculated weight. + weighted_vote as u32 + DEFAULT_WEIGHT + } +} + +impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { + if options.len() < 1 { + return Err(Error::CrdtTooSmall); + } + + let mut weighted_peers = vec![]; + for peer in options { + let weight = self.calculate_weighted_remote_index(peer.id); + weighted_peers.push(Weighted { + weight: weight, + item: peer, + }); + } + + let mut rng = thread_rng(); + Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng).clone()) + } +} + +#[cfg(test)] +mod tests { + use logger; + use signature::{KeyPair, KeyPairUtil, PublicKey}; + use std; + use std::collections::HashMap; + use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT}; + + fn get_stake(id: PublicKey) -> f64 { + return 1.0; + } + + #[test] + fn test_default() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + + let remote: HashMap = HashMap::new(); + let external_liveness: HashMap> = HashMap::new(); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + // If external_liveness table doesn't contain this entry, + // return the default weight + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, DEFAULT_WEIGHT); + } + + #[test] + fn test_only_external_liveness() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + let key2 = KeyPair::new().pubkey(); + + let remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // If only the liveness table contains the entry, should return the + // weighted liveness entries + let test_value : u32 = 5; + let mut rumors: HashMap = HashMap::new(); + rumors.insert(key2, test_value as u64); + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, test_value + DEFAULT_WEIGHT); + } + + #[test] + fn test_overflow_votes() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + let key2 = KeyPair::new().pubkey(); + + let remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // If the vote index is greater than u32::MAX, default to u32::MAX + let test_value = (std::u32::MAX as u64) + 10; + let mut rumors: HashMap = HashMap::new(); + rumors.insert(key2, test_value); + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, std::u32::MAX); + } + + #[test] + fn test_many_validators() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + + let mut remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // Test many validators' rumors in external_liveness + let num_peers = 10; + let mut rumors: HashMap = HashMap::new(); + + remote.insert(key1, 0); + + for i in 0..num_peers { + let pk = KeyPair::new().pubkey(); + rumors.insert(pk, i); + } + + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + assert_eq!(result, (num_peers/2) as u32); + } + + #[test] + fn test_many_validators2() { + logger::setup(); + + // Initialize the filler keys + let key1 = KeyPair::new().pubkey(); + + let mut remote: HashMap = HashMap::new(); + let mut external_liveness: HashMap> = HashMap::new(); + + // Test many validators' rumors in external_liveness + let num_peers = 10; + let old_index = 20; + let mut rumors: HashMap = HashMap::new(); + + remote.insert(key1, old_index); + + for i in 0..num_peers { + let pk = KeyPair::new().pubkey(); + rumors.insert(pk, old_index); + } + + external_liveness.insert(key1, rumors); + + let weighted_strategy = ChooseWeightedPeerStrategy::new( + &remote, + &external_liveness, + &get_stake, + ); + + let result = weighted_strategy.calculate_weighted_remote_index(key1); + + // If nobody has seen a newer update then rever to default + assert_eq!(result, DEFAULT_WEIGHT); + } +} \ No newline at end of file diff --git a/src/crdt.rs b/src/crdt.rs index 307d80a2e4eefb..bb2031503bc108 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -15,6 +15,11 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; +use choose_gossip_peer_strategy::{ + ChooseGossipPeerStrategy, + ChooseRandomPeerStrategy, + ChooseWeightedPeerStrategy, +}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -190,6 +195,7 @@ pub struct Crdt { pub alive: HashMap, pub update_index: u64, pub me: PublicKey, + external_liveness: HashMap>, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] @@ -200,7 +206,7 @@ enum Protocol { RequestUpdates(u64, ReplicatedData), //TODO might need a since? /// from id, form's last update index, ReplicatedData - ReceiveUpdates(PublicKey, u64, Vec), + ReceiveUpdates(PublicKey, u64, Vec, Vec<(PublicKey, u64)>), /// ask for a missing index RequestWindowIndex(ReplicatedData, u64), } @@ -213,6 +219,7 @@ impl Crdt { local: HashMap::new(), remote: HashMap::new(), alive: HashMap::new(), + external_liveness: HashMap::new(), me: me.id, update_index: 1, }; @@ -234,6 +241,14 @@ impl Crdt { self.insert(&me); } + pub fn get_external_liveness_entry( + &self, + key: &PublicKey, + ) -> Option<&HashMap> + { + self.external_liveness.get(key) + } + pub fn insert(&mut self, v: &ReplicatedData) { // TODO check that last_verified types are always increasing if self.table.get(&v.id).is_none() || (v.version > self.table[&v.id].version) { @@ -270,9 +285,11 @@ impl Crdt { if self.table.len() <= MIN_TABLE_SIZE { return; } + //wait for 4x as long as it would randomly take to reach our node //assuming everyone is waiting the same amount of time as this node let limit = self.table.len() as u64 * GOSSIP_SLEEP_MILLIS * 4; + let dead_ids: Vec = self.alive .iter() .filter_map(|(&k, v)| { @@ -285,11 +302,13 @@ impl Crdt { } }) .collect(); + for id in dead_ids.iter() { self.alive.remove(id); self.table.remove(id); self.remote.remove(id); self.local.remove(id); + self.external_liveness.remove(id); } } @@ -473,6 +492,12 @@ impl Crdt { rdr.read_u64::() .expect("rdr.read_u64 in fn random") } + + // TODO: fill in with real implmentation wonce staking is implemented + fn get_stake(id: PublicKey) -> f64 { + return 1.0; + } + fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { //trace!("get updates since {}", v); let data = self.table @@ -508,16 +533,32 @@ impl Crdt { /// * B - RequestUpdates protocol message fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect(); - if options.len() < 1 { - trace!( - "crdt too small for gossip {:?} {}", - &self.me[..4], - self.table.len() - ); - return Err(Error::CrdtTooSmall); - } - let n = (Self::random() as usize) % options.len(); - let v = options[n].clone(); + + #[cfg(not(feature = "choose_gossip_peer_strategy"))] + let choose_peer_strategy = ChooseRandomPeerStrategy::new(&Self::random); + + #[cfg(feature = "choose_gossip_peer_strategy")] + let choose_peer_strategy = ChooseWeightedPeerStrategy::new( + &self.remote, + &self.external_liveness, + &Self::get_stake, + ); + + let choose_peer_result = choose_peer_strategy.choose_peer(options); + + let v = match choose_peer_result { + Ok(peer) => peer, + Err(Error::CrdtTooSmall) => { + trace!( + "crdt too small for gossip {:?} {}", + &self.me[..4], + self.table.len() + ); + return Err(Error::CrdtTooSmall); + }, + Err(e) => return Err(e), + }; + let remote_update_index = *self.remote.get(&v.id).unwrap_or(&0); let req = Protocol::RequestUpdates(remote_update_index, self.table[&self.me].clone()); trace!( @@ -526,6 +567,7 @@ impl Crdt { &v.id[..4], v.gossip_addr ); + Ok((v.gossip_addr, req)) } @@ -543,6 +585,7 @@ impl Crdt { let (remote_gossip_addr, req) = obj.read() .expect("'obj' read lock in fn run_gossip") .gossip_request()?; + // TODO this will get chatty, so we need to first ask for number of updates since // then only ask for specific data that we dont have let blob = to_blob(req, remote_gossip_addr, blob_recycler)?; @@ -583,14 +626,43 @@ impl Crdt { /// * `from` - identity of the sender of the updates /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data - fn apply_updates(&mut self, from: PublicKey, update_index: u64, data: &[ReplicatedData]) { + fn apply_updates( + &mut self, from: PublicKey, + update_index: u64, + data: &[ReplicatedData], + external_liveness: &[(PublicKey, u64)], + ){ trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update for v in data { self.insert(&v); } + + for (pk, external_remote_index) in external_liveness.iter() { + let remote_entry = + if let Some(v) = self.remote.get(pk) { + *v + } else { + 0 + }; + + if remote_entry >= *external_remote_index { + continue; + } + + let liveness_entry = self.external_liveness.entry(*pk).or_insert(HashMap::new()); + let peer_index = *liveness_entry.entry(from).or_insert(*external_remote_index); + if *external_remote_index > peer_index { + liveness_entry.insert(from, *external_remote_index); + } + } + *self.remote.entry(from).or_insert(update_index) = update_index; + + // Clear the remote liveness table for this node, b/c we've heard directly from them + // so we don't need to rely on rumors + self.external_liveness.remove(&from); } /// randomly pick a node and ask them for updates asynchronously @@ -682,13 +754,14 @@ impl Crdt { Ok(Protocol::RequestUpdates(v, from_rd)) => { trace!("RequestUpdates {}", v); let addr = from_rd.gossip_addr; - // only lock for this call, dont lock during IO `sock.send_to` or `sock.recv_from` - let (from, ups, data) = obj.read() - .expect("'obj' read lock in RequestUpdates") - .get_updates_since(v); + let me = obj.read().unwrap(); + // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` + let (from, ups, data) = me.get_updates_since(v); + let external_liveness = me.remote.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); - let rsp = Protocol::ReceiveUpdates(from, ups, data); + let rsp = Protocol::ReceiveUpdates(from, ups, data, external_liveness); obj.write().unwrap().insert(&from_rd); if len < 1 { let me = obj.read().unwrap(); @@ -713,11 +786,11 @@ impl Crdt { None } } - Ok(Protocol::ReceiveUpdates(from, ups, data)) => { + Ok(Protocol::ReceiveUpdates(from, ups, data, external_liveness)) => { trace!("ReceivedUpdates {:?} {} {}", &from[0..4], ups, data.len()); obj.write() .expect("'obj' write lock in ReceiveUpdates") - .apply_updates(from, ups, &data); + .apply_updates(from, ups, &data, &external_liveness); None } Ok(Protocol::RequestWindowIndex(from, ix)) => { @@ -956,7 +1029,7 @@ mod tests { sorted(&vec![d1.clone(), d2.clone(), d3.clone()]) ); let mut crdt2 = Crdt::new(d2.clone()); - crdt2.apply_updates(key, ix, &ups); + crdt2.apply_updates(key, ix, &ups, &vec![]); assert_eq!(crdt2.table.values().len(), 3); assert_eq!( sorted(&crdt2.table.values().map(|x| x.clone()).collect()), diff --git a/src/lib.rs b/src/lib.rs index f2bcaa4544def9..d551c68b738623 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ pub mod transaction; pub mod tvu; pub mod window_stage; pub mod write_stage; +mod choose_gossip_peer_strategy; extern crate bincode; extern crate byteorder; extern crate chrono; diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index b750dbc72ee6e3..a25eeb8e8059ac 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -183,3 +183,105 @@ pub fn crdt_retransmit() { t.join().unwrap(); } } + +#[test] +fn check_external_liveness_table() { + logger::setup(); + let c1_c4_exit = Arc::new(AtomicBool::new(false)); + let c2_c3_exit = Arc::new(AtomicBool::new(false)); + + trace!("c1:"); + let (c1, dr1, _) = test_node(c1_c4_exit.clone()); + trace!("c2:"); + let (c2, dr2, _) = test_node(c2_c3_exit.clone()); + trace!("c3:"); + let (c3, dr3, _) = test_node(c2_c3_exit.clone()); + trace!("c4:"); + let (c4, dr4, _) = test_node(c1_c4_exit.clone()); + + let c1_data = c1.read().unwrap().my_data().clone(); + c1.write().unwrap().set_leader(c1_data.id); + + let c2_id = c2.read().unwrap().me; + let c3_id = c3.read().unwrap().me; + let c4_id = c4.read().unwrap().me; + + // Insert the remote data about c4 + let c2_index_for_c4 = 10; + c2.write().unwrap().remote.insert(c4_id, c2_index_for_c4); + let c3_index_for_c4 = 20; + c3.write().unwrap().remote.insert(c4_id, c3_index_for_c4); + + // Set up the initial network topology + c2.write().unwrap().insert(&c1_data); + c3.write().unwrap().insert(&c1_data); + + c2.write().unwrap().set_leader(c1_data.id); + c3.write().unwrap().set_leader(c1_data.id); + + // Wait to converge + trace!("waiting to converge:"); + let mut done = false; + for _ in 0..30 { + done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3 + && c3.read().unwrap().table.len() == 3; + if done { + break; + } + sleep(Duration::new(1, 0)); + } + assert!(done); + + // Validate c1's external liveness table, then release lock rc1 + { + let rc1 = c1.read().unwrap(); + let el = rc1.get_external_liveness_entry(&c4.read().unwrap().me); + + // Make sure liveness table entry for c4 exists on node c1 + assert!(el.is_some()); + let liveness_map = el.unwrap(); + + // Make sure liveness table entry contains correct result for c2 + let c2_index_result_for_c4 = liveness_map.get(&c2_id); + assert!(c2_index_result_for_c4.is_some()); + assert!(*(c2_index_result_for_c4.unwrap()) == c2_index_for_c4); + + // Make sure liveness table entry contains correct result for c3 + let c3_index_result_for_c4 = liveness_map.get(&c3_id); + assert!(c3_index_result_for_c4.is_some()); + assert!(*(c3_index_result_for_c4.unwrap()) == c3_index_for_c4); + } + + // Shutdown validators c2 and c3 + c2_c3_exit.store(true, Ordering::Relaxed); + let mut threads = vec![]; + threads.extend(dr2.thread_hdls.into_iter()); + threads.extend(dr3.thread_hdls.into_iter()); + + for t in threads.into_iter() { + t.join().unwrap(); + } + + // Allow communication between c1 and c4, make sure that c1's external_liveness table + // entry for c4 gets cleared + c4.write().unwrap().insert(&c1_data); + c4.write().unwrap().set_leader(c1_data.id); + for _ in 0..30 { + done = c1.read().unwrap().get_external_liveness_entry(&c4_id).is_none(); + if done { + break; + } + sleep(Duration::new(1, 0)); + } + assert!(done); + + // Shutdown validators c1 and c4 + c1_c4_exit.store(true, Ordering::Relaxed); + let mut threads = vec![]; + threads.extend(dr1.thread_hdls.into_iter()); + threads.extend(dr4.thread_hdls.into_iter()); + + for t in threads.into_iter() { + t.join().unwrap(); + } +} \ No newline at end of file From 2611a9029e6d7e620606b59bf2846e80b5e9be82 Mon Sep 17 00:00:00 2001 From: "OEM Configuration (temporary user)" Date: Mon, 25 Jun 2018 03:59:15 -0700 Subject: [PATCH 2/3] ran linter --- src/choose_gossip_peer_strategy.rs | 169 +++++++++++++---------------- src/crdt.rs | 39 ++++--- src/lib.rs | 2 +- 3 files changed, 96 insertions(+), 114 deletions(-) diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index 20a3bd0b48c98b..889e8709d9199c 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -1,6 +1,6 @@ use crdt::ReplicatedData; -use rand::thread_rng; use rand::distributions::{IndependentSample, Weighted, WeightedChoice}; +use rand::thread_rng; use result::{Error, Result}; use signature::PublicKey; use std; @@ -9,8 +9,7 @@ use std::collections::HashMap; pub const DEFAULT_WEIGHT: u32 = 1; pub trait ChooseGossipPeerStrategy { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> - Result; + fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result; } pub struct ChooseRandomPeerStrategy<'a> { @@ -18,7 +17,7 @@ pub struct ChooseRandomPeerStrategy<'a> { } impl<'a> ChooseRandomPeerStrategy<'a> { - pub fn new(random: &'a Fn() -> u64,) -> Self { + pub fn new(random: &'a Fn() -> u64) -> Self { ChooseRandomPeerStrategy { random } } } @@ -45,9 +44,12 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { remote: &'a HashMap, external_liveness: &'a HashMap>, get_stake: &'a Fn(PublicKey) -> f64, - ) -> Self - { - ChooseWeightedPeerStrategy { remote, external_liveness, get_stake } + ) -> Self { + ChooseWeightedPeerStrategy { + remote, + external_liveness, + get_stake, + } } fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 { @@ -60,7 +62,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } let liveness_entry = self.external_liveness.get(&peer_id); - if liveness_entry.is_none(){ + if liveness_entry.is_none() { return DEFAULT_WEIGHT; } @@ -73,61 +75,55 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { // Calculate the weighted average of the rumors let mut relevant_votes = vec![]; - let total_stake = votes.iter().fold( - 0.0, - |total_stake, (&id, &vote)| { - let stake = (self.get_stake)(id); - // If the total stake is going to overflow u64, pick - // the larger of either the current total_stake, or the - // new stake, this way we are guaranteed to get at least u64/2 - // sample of stake in our weighted calculation - if std::f64::MAX - total_stake < stake { - if stake > total_stake { - relevant_votes = vec![(stake, vote)]; - stake - } else { - total_stake - } + let total_stake = votes.iter().fold(0.0, |total_stake, (&id, &vote)| { + let stake = (self.get_stake)(id); + // If the total stake is going to overflow u64, pick + // the larger of either the current total_stake, or the + // new stake, this way we are guaranteed to get at least u64/2 + // sample of stake in our weighted calculation + if std::f64::MAX - total_stake < stake { + if stake > total_stake { + relevant_votes = vec![(stake, vote)]; + stake } else { - relevant_votes.push((stake, vote)); - total_stake + stake + total_stake } + } else { + relevant_votes.push((stake, vote)); + total_stake + stake + } + }); + + let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| { + if vote < last_seen_index { + // This should never happen b/c we maintain the invariant that the indexes + // in the external_liveness table are always greater than the corresponding + // indexes in the remote table, if the index exists in the remote table at all. + + // Case 1: Attempt to insert bigger index into the "external_liveness" table + // happens after an insertion into the "remote" table. In this case, + // (see apply_updates()) function, we prevent the insertion if the entry + // in the remote table >= the atempted insertion into the "external" liveness + // table. + + // Case 2: Bigger index in the "external_liveness" table inserted before + // a smaller insertion into the "remote" table. We clear the corresponding + // "external_liveness" table entry on all insertions into the "remote" table + // See apply_updates() function. + + warn!("weighted peer index was smaller than local entry in remote table"); + return sum; } - ); - - let weighted_vote = relevant_votes.iter().fold( - 0.0, - |sum, &(stake, vote)| { - if vote < last_seen_index { - // This should never happen b/c we maintain the invariant that the indexes - // in the external_liveness table are always greater than the corresponding - // indexes in the remote table, if the index exists in the remote table at all. - - // Case 1: Attempt to insert bigger index into the "external_liveness" table - // happens after an insertion into the "remote" table. In this case, - // (see apply_updates()) function, we prevent the insertion if the entry - // in the remote table >= the atempted insertion into the "external" liveness - // table. - - // Case 2: Bigger index in the "external_liveness" table inserted before - // a smaller insertion into the "remote" table. We clear the corresponding - // "external_liveness" table entry on all insertions into the "remote" table - // See apply_updates() function. - - warn!("weighted peer index was smaller than local entry in remote table"); - return sum; - } - let vote_difference = (vote - last_seen_index) as f64; - let new_weight = vote_difference * (stake / total_stake); + let vote_difference = (vote - last_seen_index) as f64; + let new_weight = vote_difference * (stake / total_stake); - if std::f64::MAX - sum < new_weight { - return f64::max(new_weight, sum); - } + if std::f64::MAX - sum < new_weight { + return f64::max(new_weight, sum); + } - sum + new_weight - }, - ); + sum + new_weight + }); // Return u32 b/c the weighted sampling API from rand::distributions // only takes u32 for weights @@ -136,8 +132,8 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } // If the weighted rumors we've heard about aren't any greater than - // what we've directly learned from the last time we communicated with the - // peer (i.e. weighted_vote == 0), then return a weight of 1. + // what we've directly learned from the last time we communicated with the + // peer (i.e. weighted_vote == 0), then return a weight of 1. // Otherwise, return the calculated weight. weighted_vote as u32 + DEFAULT_WEIGHT } @@ -159,17 +155,19 @@ impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { } let mut rng = thread_rng(); - Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng).clone()) + Ok(WeightedChoice::new(&mut weighted_peers) + .ind_sample(&mut rng) + .clone()) } } #[cfg(test)] mod tests { + use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT}; use logger; use signature::{KeyPair, KeyPairUtil, PublicKey}; use std; use std::collections::HashMap; - use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT}; fn get_stake(id: PublicKey) -> f64 { return 1.0; @@ -185,13 +183,10 @@ mod tests { let remote: HashMap = HashMap::new(); let external_liveness: HashMap> = HashMap::new(); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); - // If external_liveness table doesn't contain this entry, + // If external_liveness table doesn't contain this entry, // return the default weight let result = weighted_strategy.calculate_weighted_remote_index(key1); assert_eq!(result, DEFAULT_WEIGHT); @@ -209,17 +204,14 @@ mod tests { let mut external_liveness: HashMap> = HashMap::new(); // If only the liveness table contains the entry, should return the - // weighted liveness entries - let test_value : u32 = 5; + // weighted liveness entries + let test_value: u32 = 5; let mut rumors: HashMap = HashMap::new(); rumors.insert(key2, test_value as u64); external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); assert_eq!(result, test_value + DEFAULT_WEIGHT); @@ -242,11 +234,8 @@ mod tests { rumors.insert(key2, test_value); external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); assert_eq!(result, std::u32::MAX); @@ -262,7 +251,7 @@ mod tests { let mut remote: HashMap = HashMap::new(); let mut external_liveness: HashMap> = HashMap::new(); - // Test many validators' rumors in external_liveness + // Test many validators' rumors in external_liveness let num_peers = 10; let mut rumors: HashMap = HashMap::new(); @@ -275,14 +264,11 @@ mod tests { external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); - assert_eq!(result, (num_peers/2) as u32); + assert_eq!(result, (num_peers / 2) as u32); } #[test] @@ -295,7 +281,7 @@ mod tests { let mut remote: HashMap = HashMap::new(); let mut external_liveness: HashMap> = HashMap::new(); - // Test many validators' rumors in external_liveness + // Test many validators' rumors in external_liveness let num_peers = 10; let old_index = 20; let mut rumors: HashMap = HashMap::new(); @@ -309,15 +295,12 @@ mod tests { external_liveness.insert(key1, rumors); - let weighted_strategy = ChooseWeightedPeerStrategy::new( - &remote, - &external_liveness, - &get_stake, - ); + let weighted_strategy = + ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake); let result = weighted_strategy.calculate_weighted_remote_index(key1); // If nobody has seen a newer update then rever to default assert_eq!(result, DEFAULT_WEIGHT); } -} \ No newline at end of file +} diff --git a/src/crdt.rs b/src/crdt.rs index bb2031503bc108..a9b125444c1a32 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -15,11 +15,8 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; -use choose_gossip_peer_strategy::{ - ChooseGossipPeerStrategy, - ChooseRandomPeerStrategy, - ChooseWeightedPeerStrategy, -}; +use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, + ChooseWeightedPeerStrategy}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -241,11 +238,7 @@ impl Crdt { self.insert(&me); } - pub fn get_external_liveness_entry( - &self, - key: &PublicKey, - ) -> Option<&HashMap> - { + pub fn get_external_liveness_entry(&self, key: &PublicKey) -> Option<&HashMap> { self.external_liveness.get(key) } @@ -309,6 +302,9 @@ impl Crdt { self.remote.remove(id); self.local.remove(id); self.external_liveness.remove(id); + for map in self.external_liveness.values_mut() { + map.remove(id); + } } } @@ -555,7 +551,7 @@ impl Crdt { self.table.len() ); return Err(Error::CrdtTooSmall); - }, + } Err(e) => return Err(e), }; @@ -627,11 +623,12 @@ impl Crdt { /// * `update_index` - the number of updates that `from` has completed and this set of `data` represents /// * `data` - the update data fn apply_updates( - &mut self, from: PublicKey, + &mut self, + from: PublicKey, update_index: u64, data: &[ReplicatedData], external_liveness: &[(PublicKey, u64)], - ){ + ) { trace!("got updates {}", data.len()); // TODO we need to punish/spam resist here // sig verify the whole update and slash anyone who sends a bad update @@ -640,12 +637,11 @@ impl Crdt { } for (pk, external_remote_index) in external_liveness.iter() { - let remote_entry = - if let Some(v) = self.remote.get(pk) { - *v - } else { - 0 - }; + let remote_entry = if let Some(v) = self.remote.get(pk) { + *v + } else { + 0 + }; if remote_entry >= *external_remote_index { continue; @@ -757,7 +753,10 @@ impl Crdt { let me = obj.read().unwrap(); // only lock for these two calls, dont lock during IO `sock.send_to` or `sock.recv_from` let (from, ups, data) = me.get_updates_since(v); - let external_liveness = me.remote.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + let external_liveness = me.remote + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); drop(me); trace!("get updates since response {} {}", v, data.len()); let len = data.len(); diff --git a/src/lib.rs b/src/lib.rs index d551c68b738623..7f17a0f66f4383 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod bank; pub mod banking_stage; pub mod blob_fetch_stage; pub mod budget; +mod choose_gossip_peer_strategy; pub mod crdt; pub mod drone; pub mod entry; @@ -47,7 +48,6 @@ pub mod transaction; pub mod tvu; pub mod window_stage; pub mod write_stage; -mod choose_gossip_peer_strategy; extern crate bincode; extern crate byteorder; extern crate chrono; From 4c1f25656bb55a4c3d5e1d0fd5c12721f414664d Mon Sep 17 00:00:00 2001 From: "OEM Configuration (temporary user)" Date: Tue, 26 Jun 2018 01:08:19 -0700 Subject: [PATCH 3/3] Some pull request fixes(linting + documentation) --- Cargo.toml | 1 - src/choose_gossip_peer_strategy.rs | 57 ++++++++++++++++++++++-------- src/crdt.rs | 15 ++++---- tests/data_replicator.rs | 20 ++++++----- 4 files changed, 60 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a39a8a1581c3ed..fcce844bafc717 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,6 @@ unstable = [] ipv6 = [] cuda = [] erasure = [] -gossip_choose_weighted_peer = [] [dependencies] rayon = "1.0.0" diff --git a/src/choose_gossip_peer_strategy.rs b/src/choose_gossip_peer_strategy.rs index 889e8709d9199c..5642e006513404 100644 --- a/src/choose_gossip_peer_strategy.rs +++ b/src/choose_gossip_peer_strategy.rs @@ -9,33 +9,65 @@ use std::collections::HashMap; pub const DEFAULT_WEIGHT: u32 = 1; pub trait ChooseGossipPeerStrategy { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result; + fn choose_peer<'a>(&self, options: Vec<&'a ReplicatedData>) -> Result<&'a ReplicatedData>; } pub struct ChooseRandomPeerStrategy<'a> { random: &'a Fn() -> u64, } -impl<'a> ChooseRandomPeerStrategy<'a> { +// Given a source of randomness "random", this strategy will randomly pick a validator +// from the input options. This strategy works in isolation, but doesn't leverage any +// rumors from the rest of the gossip network to make more informed decisions about +// which validators have more/less updates +impl<'a, 'b> ChooseRandomPeerStrategy<'a> { pub fn new(random: &'a Fn() -> u64) -> Self { ChooseRandomPeerStrategy { random } } } impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { - if options.len() < 1 { + fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { + if options.is_empty() { return Err(Error::CrdtTooSmall); } let n = ((self.random)() as usize) % options.len(); - Ok(options[n].clone()) + Ok(options[n]) } } +// This strategy uses rumors accumulated from the rest of the network to weight +// the importance of communicating with a particular validator based on cumulative network +// perceiption of the number of updates the validator has to offer. A validator is randomly +// picked based on a weighted sample from the pool of viable choices. The "weight", w, of a +// particular validator "v" is calculated as follows: +// +// w = [Sum for all i in I_v: (rumor_v(i) - observed(v)) * stake(i)] / +// [Sum for all i in I_v: Sum(stake(i))] +// +// where I_v is the set of all validators that returned a rumor about the update_index of +// validator "v", stake(i) is the size of the stake of validator "i", observed(v) is the +// observed update_index from the last direct communication validator "v", and +// rumor_v(i) is the rumored update_index of validator "v" propagated by fellow validator "i". + +// This could be a problem if there are validators with large stakes lying about their +// observed updates. There could also be a problem in network partitions, or even just +// when certain validators are disproportionately active, where we hear more rumors about +// certain clusters of nodes that then propagate more rumros about each other. Hopefully +// this can be resolved with a good baseline DEFAULT_WEIGHT, or by implementing lockout +// periods for very active validators in the future. + pub struct ChooseWeightedPeerStrategy<'a> { + // The map of last directly observed update_index for each active validator. + // This is how we get observed(v) from the formula above. remote: &'a HashMap, + // The map of rumored update_index for each active validator. Using the formula above, + // to find rumor_v(i), we would first look up "v" in the outer map, then look up + // "i" in the inner map, i.e. look up external_liveness[v][i] external_liveness: &'a HashMap>, + // A function returning the size of the stake for a particular validator, corresponds + // to stake(i) in the formula above. get_stake: &'a Fn(PublicKey) -> f64, } @@ -96,7 +128,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| { if vote < last_seen_index { - // This should never happen b/c we maintain the invariant that the indexes + // This should never happen because we maintain the invariant that the indexes // in the external_liveness table are always greater than the corresponding // indexes in the remote table, if the index exists in the remote table at all. @@ -140,7 +172,7 @@ impl<'a> ChooseWeightedPeerStrategy<'a> { } impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { - fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result { + fn choose_peer<'b>(&self, options: Vec<&'b ReplicatedData>) -> Result<&'b ReplicatedData> { if options.len() < 1 { return Err(Error::CrdtTooSmall); } @@ -148,16 +180,11 @@ impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> { let mut weighted_peers = vec![]; for peer in options { let weight = self.calculate_weighted_remote_index(peer.id); - weighted_peers.push(Weighted { - weight: weight, - item: peer, - }); + weighted_peers.push(Weighted { weight, item: peer }); } let mut rng = thread_rng(); - Ok(WeightedChoice::new(&mut weighted_peers) - .ind_sample(&mut rng) - .clone()) + Ok(WeightedChoice::new(&mut weighted_peers).ind_sample(&mut rng)) } } @@ -300,7 +327,7 @@ mod tests { let result = weighted_strategy.calculate_weighted_remote_index(key1); - // If nobody has seen a newer update then rever to default + // If nobody has seen a newer update then revert to default assert_eq!(result, DEFAULT_WEIGHT); } } diff --git a/src/crdt.rs b/src/crdt.rs index a9b125444c1a32..cce92f017dd636 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -15,8 +15,9 @@ use bincode::{deserialize, serialize}; use byteorder::{LittleEndian, ReadBytesExt}; -use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, - ChooseWeightedPeerStrategy}; +use choose_gossip_peer_strategy::{ + ChooseGossipPeerStrategy, ChooseRandomPeerStrategy, ChooseWeightedPeerStrategy, +}; use hash::Hash; use packet::{to_blob, Blob, BlobRecycler, SharedBlob, BLOB_SIZE}; use pnet_datalink as datalink; @@ -489,9 +490,9 @@ impl Crdt { .expect("rdr.read_u64 in fn random") } - // TODO: fill in with real implmentation wonce staking is implemented + // TODO: fill in with real implmentation once staking is implemented fn get_stake(id: PublicKey) -> f64 { - return 1.0; + 1.0 } fn get_updates_since(&self, v: u64) -> (PublicKey, u64, Vec) { @@ -530,10 +531,6 @@ impl Crdt { fn gossip_request(&self) -> Result<(SocketAddr, Protocol)> { let options: Vec<_> = self.table.values().filter(|v| v.id != self.me).collect(); - #[cfg(not(feature = "choose_gossip_peer_strategy"))] - let choose_peer_strategy = ChooseRandomPeerStrategy::new(&Self::random); - - #[cfg(feature = "choose_gossip_peer_strategy")] let choose_peer_strategy = ChooseWeightedPeerStrategy::new( &self.remote, &self.external_liveness, @@ -636,7 +633,7 @@ impl Crdt { self.insert(&v); } - for (pk, external_remote_index) in external_liveness.iter() { + for (pk, external_remote_index) in external_liveness { let remote_entry = if let Some(v) = self.remote.get(pk) { *v } else { diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index a25eeb8e8059ac..b2f911d351b670 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -185,7 +185,7 @@ pub fn crdt_retransmit() { } #[test] -fn check_external_liveness_table() { +fn test_external_liveness_table() { logger::setup(); let c1_c4_exit = Arc::new(AtomicBool::new(false)); let c2_c3_exit = Arc::new(AtomicBool::new(false)); @@ -223,7 +223,8 @@ fn check_external_liveness_table() { trace!("waiting to converge:"); let mut done = false; for _ in 0..30 { - done = c1.read().unwrap().table.len() == 3 && c2.read().unwrap().table.len() == 3 + done = c1.read().unwrap().table.len() == 3 + && c2.read().unwrap().table.len() == 3 && c3.read().unwrap().table.len() == 3; if done { break; @@ -244,12 +245,12 @@ fn check_external_liveness_table() { // Make sure liveness table entry contains correct result for c2 let c2_index_result_for_c4 = liveness_map.get(&c2_id); assert!(c2_index_result_for_c4.is_some()); - assert!(*(c2_index_result_for_c4.unwrap()) == c2_index_for_c4); + assert_eq!(*(c2_index_result_for_c4.unwrap()), c2_index_for_c4); // Make sure liveness table entry contains correct result for c3 let c3_index_result_for_c4 = liveness_map.get(&c3_id); assert!(c3_index_result_for_c4.is_some()); - assert!(*(c3_index_result_for_c4.unwrap()) == c3_index_for_c4); + assert_eq!(*(c3_index_result_for_c4.unwrap()), c3_index_for_c4); } // Shutdown validators c2 and c3 @@ -258,7 +259,7 @@ fn check_external_liveness_table() { threads.extend(dr2.thread_hdls.into_iter()); threads.extend(dr3.thread_hdls.into_iter()); - for t in threads.into_iter() { + for t in threads { t.join().unwrap(); } @@ -267,7 +268,10 @@ fn check_external_liveness_table() { c4.write().unwrap().insert(&c1_data); c4.write().unwrap().set_leader(c1_data.id); for _ in 0..30 { - done = c1.read().unwrap().get_external_liveness_entry(&c4_id).is_none(); + done = c1.read() + .unwrap() + .get_external_liveness_entry(&c4_id) + .is_none(); if done { break; } @@ -281,7 +285,7 @@ fn check_external_liveness_table() { threads.extend(dr1.thread_hdls.into_iter()); threads.extend(dr4.thread_hdls.into_iter()); - for t in threads.into_iter() { + for t in threads { t.join().unwrap(); } -} \ No newline at end of file +}