Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Improve CRDT convergence #438

Merged
merged 3 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ unstable = []
ipv6 = []
cuda = []
erasure = []
gossip_choose_weighted_peer = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be guarded with a feature flag. It's nice to have both options available, but I'm hoping a "best" option will emerge and we'll just always use that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, I was wondering if it was necessary for a safe rollout procedure. I can remove this.


[dependencies]
rayon = "1.0.0"
Expand Down
306 changes: 306 additions & 0 deletions src/choose_gossip_peer_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@
use crdt::ReplicatedData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add module documentation?

use rand::distributions::{IndependentSample, Weighted, WeightedChoice};
use rand::thread_rng;
use result::{Error, Result};
use signature::PublicKey;
use std;
use std::collections::HashMap;

pub const DEFAULT_WEIGHT: u32 = 1;

pub trait ChooseGossipPeerStrategy {
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you able to return a reference here to avoid the clone()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, @aeyakovenko, what do you think about the use of the term "peer" here? Should we consider replacing ReplicatedData with Peer?

Copy link
Contributor Author

@carllin carllin Jun 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garious, yeah I thought the reference would have been fine, but in the original gossip_request() code, there was a clone(), so I aligned with that. https://github.com/solana-labs/solana/blob/master/src/crdt.rs#L520. Looking at it more in depth now, it doesn't seem necessary.

}

pub struct ChooseRandomPeerStrategy<'a> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that summarizes this strategy and its limitations?

random: &'a Fn() -> u64,
}

impl<'a> ChooseRandomPeerStrategy<'a> {
pub fn new(random: &'a Fn() -> u64) -> Self {
ChooseRandomPeerStrategy { random }
}
}

impl<'a> ChooseGossipPeerStrategy for ChooseRandomPeerStrategy<'a> {
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData> {
if options.len() < 1 {
return Err(Error::CrdtTooSmall);
}

let n = ((self.random)() as usize) % options.len();
Ok(options[n].clone())
}
}

pub struct ChooseWeightedPeerStrategy<'a> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that summarizes this strategy and how it overcomes the limitations of ChooseRandomPeerStrategy?

remote: &'a HashMap<PublicKey, u64>,
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment each field? Especially external_liveness. Is this name precise? What's the difference between remote and external? I think I'm missing something about "liveness" too. Is the node's liveness relevant here? Maybe that's a question for @aeyakovenko

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point, @aeyakovenko suggested it so I think he might have more insight. Perhaps remote_rumors might be a better name

get_stake: &'a Fn(PublicKey) -> f64,
}

impl<'a> ChooseWeightedPeerStrategy<'a> {
pub fn new(
remote: &'a HashMap<PublicKey, u64>,
external_liveness: &'a HashMap<PublicKey, HashMap<PublicKey, u64>>,
get_stake: &'a Fn(PublicKey) -> f64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessarily generic. Why not make it a method of this struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it in mostly for flexible testing purposes, I was afraid that whoever added a real implementation later would break my test cases and be forced to fix them. This way, I can define the function in my tests without worrying about adjusting for future changes.

) -> Self {
ChooseWeightedPeerStrategy {
remote,
external_liveness,
get_stake,
}
}

fn calculate_weighted_remote_index(&self, peer_id: PublicKey) -> u32 {
let mut last_seen_index = 0;
// If the peer is not in our remote table, then we leave last_seen_index as zero.
// Only happens when a peer appears in our crdt.table but not in our crdt.remote,
// which means a validator was directly injected into our crdt.table
if let Some(index) = self.remote.get(&peer_id) {
last_seen_index = *index;
}

let liveness_entry = self.external_liveness.get(&peer_id);
if liveness_entry.is_none() {
return DEFAULT_WEIGHT;
}

let votes = liveness_entry.unwrap();

if votes.is_empty() {
return DEFAULT_WEIGHT;
}

// Calculate the weighted average of the rumors
let mut relevant_votes = vec![];

let total_stake = votes.iter().fold(0.0, |total_stake, (&id, &vote)| {
let stake = (self.get_stake)(id);
// If the total stake is going to overflow u64, pick
// the larger of either the current total_stake, or the
// new stake, this way we are guaranteed to get at least u64/2
// sample of stake in our weighted calculation
if std::f64::MAX - total_stake < stake {
if stake > total_stake {
relevant_votes = vec![(stake, vote)];
stake
} else {
total_stake
}
} else {
relevant_votes.push((stake, vote));
total_stake + stake
}
});

let weighted_vote = relevant_votes.iter().fold(0.0, |sum, &(stake, vote)| {
if vote < last_seen_index {
// This should never happen b/c we maintain the invariant that the indexes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please spell out "because"? There's no other shorthand in the codebase. Let's keep it that way.

// in the external_liveness table are always greater than the corresponding
// indexes in the remote table, if the index exists in the remote table at all.

// Case 1: Attempt to insert bigger index into the "external_liveness" table
// happens after an insertion into the "remote" table. In this case,
// (see apply_updates()) function, we prevent the insertion if the entry
// in the remote table >= the atempted insertion into the "external" liveness
// table.

// Case 2: Bigger index in the "external_liveness" table inserted before
// a smaller insertion into the "remote" table. We clear the corresponding
// "external_liveness" table entry on all insertions into the "remote" table
// See apply_updates() function.

warn!("weighted peer index was smaller than local entry in remote table");
return sum;
}

let vote_difference = (vote - last_seen_index) as f64;
let new_weight = vote_difference * (stake / total_stake);

if std::f64::MAX - sum < new_weight {
return f64::max(new_weight, sum);
}

sum + new_weight
});

// Return u32 b/c the weighted sampling API from rand::distributions
// only takes u32 for weights
if weighted_vote >= std::u32::MAX as f64 {
return std::u32::MAX;
}

// If the weighted rumors we've heard about aren't any greater than
// what we've directly learned from the last time we communicated with the
// peer (i.e. weighted_vote == 0), then return a weight of 1.
// Otherwise, return the calculated weight.
weighted_vote as u32 + DEFAULT_WEIGHT
}
}

impl<'a> ChooseGossipPeerStrategy for ChooseWeightedPeerStrategy<'a> {
fn choose_peer(&self, options: Vec<&ReplicatedData>) -> Result<ReplicatedData> {
if options.len() < 1 {
return Err(Error::CrdtTooSmall);
}

let mut weighted_peers = vec![];
for peer in options {
let weight = self.calculate_weighted_remote_index(peer.id);
weighted_peers.push(Weighted {
weight: weight,
item: peer,
});
}

let mut rng = thread_rng();
Ok(WeightedChoice::new(&mut weighted_peers)
.ind_sample(&mut rng)
.clone())
}
}

#[cfg(test)]
mod tests {
use choose_gossip_peer_strategy::{ChooseWeightedPeerStrategy, DEFAULT_WEIGHT};
use logger;
use signature::{KeyPair, KeyPairUtil, PublicKey};
use std;
use std::collections::HashMap;

fn get_stake(id: PublicKey) -> f64 {
return 1.0;
}

#[test]
fn test_default() {
logger::setup();

// Initialize the filler keys
let key1 = KeyPair::new().pubkey();

let remote: HashMap<PublicKey, u64> = HashMap::new();
let external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();

let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);

// If external_liveness table doesn't contain this entry,
// return the default weight
let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, DEFAULT_WEIGHT);
}

#[test]
fn test_only_external_liveness() {
logger::setup();

// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let key2 = KeyPair::new().pubkey();

let remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();

// If only the liveness table contains the entry, should return the
// weighted liveness entries
let test_value: u32 = 5;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
rumors.insert(key2, test_value as u64);
external_liveness.insert(key1, rumors);

let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);

let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, test_value + DEFAULT_WEIGHT);
}

#[test]
fn test_overflow_votes() {
logger::setup();

// Initialize the filler keys
let key1 = KeyPair::new().pubkey();
let key2 = KeyPair::new().pubkey();

let remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();

// If the vote index is greater than u32::MAX, default to u32::MAX
let test_value = (std::u32::MAX as u64) + 10;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();
rumors.insert(key2, test_value);
external_liveness.insert(key1, rumors);

let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);

let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, std::u32::MAX);
}

#[test]
fn test_many_validators() {
logger::setup();

// Initialize the filler keys
let key1 = KeyPair::new().pubkey();

let mut remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();

// Test many validators' rumors in external_liveness
let num_peers = 10;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();

remote.insert(key1, 0);

for i in 0..num_peers {
let pk = KeyPair::new().pubkey();
rumors.insert(pk, i);
}

external_liveness.insert(key1, rumors);

let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);

let result = weighted_strategy.calculate_weighted_remote_index(key1);
assert_eq!(result, (num_peers / 2) as u32);
}

#[test]
fn test_many_validators2() {
logger::setup();

// Initialize the filler keys
let key1 = KeyPair::new().pubkey();

let mut remote: HashMap<PublicKey, u64> = HashMap::new();
let mut external_liveness: HashMap<PublicKey, HashMap<PublicKey, u64>> = HashMap::new();

// Test many validators' rumors in external_liveness
let num_peers = 10;
let old_index = 20;
let mut rumors: HashMap<PublicKey, u64> = HashMap::new();

remote.insert(key1, old_index);

for i in 0..num_peers {
let pk = KeyPair::new().pubkey();
rumors.insert(pk, old_index);
}

external_liveness.insert(key1, rumors);

let weighted_strategy =
ChooseWeightedPeerStrategy::new(&remote, &external_liveness, &get_stake);

let result = weighted_strategy.calculate_weighted_remote_index(key1);

// If nobody has seen a newer update then rever to default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

assert_eq!(result, DEFAULT_WEIGHT);
}
}
Loading