Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip v1.1 opportunistic grafting #43

Merged
merged 3 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use crate::protocol::{
};
use crate::rpc_proto;
use crate::topic::{Hasher, Topic, TopicHash};
use std::cmp::Ordering::Equal;

mod tests;

Expand Down Expand Up @@ -1547,7 +1548,57 @@ impl Gossipsub {
}
}

//TODO opportunistic grafting
// should we try to improve the mesh with opportunistic grafting?
if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0
&& peers.len() > 1 && self.peer_score.is_some() {
if let Some((_, thresholds, _)) = &self.peer_score {
// Opportunistic grafting works as follows: we check the median score of peers
// in the mesh; if this score is below the opportunisticGraftThreshold, we
// select a few peers at random with score over the median.
// The intention is to (slowly) improve an underperforming mesh by introducing
// good scoring peers that may have been gossiping at us. This allows us to
// get out of sticky situations where we are stuck with poor peers and also
// recover from churn of good peers.


// now compute the median peer score in the mesh
let mut peers_by_score: Vec<_> = peers.iter().collect();
peers_by_score.sort_by(|p1, p2|
score(p1).partial_cmp(&score(p2)).unwrap_or(Equal)
);

let middle = peers_by_score.len() / 2;
let median = if peers_by_score.len() % 2 == 0 {
(score(*peers_by_score.get(middle - 1)
.expect("middle < vector length and middle > 0 since peers.len() > 0")) +
score(*peers_by_score.get(middle).expect("middle < vector length")))
* 0.5
} else {
score(*peers_by_score.get(middle).expect("middle < vector length"))
};

// if the median score is below the threshold, select a better peer (if any) and
// GRAFT
if median < thresholds.opportunistic_graft_threshold {
let peer_list =
Self::get_random_peers(topic_peers, topic_hash,
self.config.opportunistic_graft_peers(),|peer| {
!peers.contains(peer)
&& !explicit_peers.contains(peer)
&& !backoffs.is_backoff_with_slack(topic_hash, peer)
&& score(peer) > median
});
for peer in &peer_list {
let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new);
current_topic.push(topic_hash.clone());
}
// update the mesh
debug!("Opportunistically graft in topic {} with peers {:?}",
topic_hash, peer_list);
peers.extend(peer_list);
}
}
}
}

// remove expired fanout topics
Expand Down
86 changes: 83 additions & 3 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ mod tests {
use std::thread::sleep;
use std::time::Duration;

use async_std::net::Ipv4Addr;
use rand::Rng;

use crate::{GossipsubConfigBuilder, IdentTopic as Topic, TopicScoreParams};

use super::super::*;
use async_std::net::Ipv4Addr;
use rand::Rng;

// helper functions for testing
// helper functions for testing

fn build_and_inject_nodes(
peer_no: usize,
Expand Down Expand Up @@ -3709,4 +3710,83 @@ mod tests {
1.0 * 0.9 * 0.9 * -2.0
);
}

#[test]
fn test_opportunistic_grafting() {
let config = GossipsubConfigBuilder::new()
.mesh_n_low(3)
.mesh_n(5)
.mesh_n_high(7)
.mesh_outbound_min(0) //deactivate outbound handling
.opportunistic_graft_ticks(2)
.opportunistic_graft_peers(2)
.build()
.unwrap();
let mut peer_score_params = PeerScoreParams::default();
peer_score_params.app_specific_weight = 1.0;
let mut thresholds = PeerScoreThresholds::default();
thresholds.opportunistic_graft_threshold = 2.0;

let (mut gs, peers, topics) =
build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring(
5,
vec!["test".into()],
false,
config,
0,
0,
Some((peer_score_params, thresholds)),
);

//fill mesh with 5 peers
for peer in &peers {
gs.handle_graft(peer, topics.clone());
}

//add additional 5 peers
let others: Vec<_> = (0..5)
.into_iter()
.map(|_| add_peer(&mut gs, &topics, false, false))
.collect();

//currently mesh equals peers
assert_eq!(gs.mesh[&topics[0]], peers.iter().cloned().collect());

//give others high scores (but the first two have not high enough scores)
for i in 0..5 {
gs.set_application_score(&peers[i], 0.0 + i as f64);
}

//set scores for peers in the mesh
for i in 0..5 {
gs.set_application_score(&others[i], 0.0 + i as f64);
}

//this gives a median of exactly 2.0 => should not apply opportunistic grafting
gs.heartbeat();
gs.heartbeat();

assert_eq!(gs.mesh[&topics[0]].len(), 5, "should not apply opportunistic grafting");

//reduce middle score to 1.0 giving a median of 1.0
gs.set_application_score(&peers[2], 1.0);


//opportunistic grafting after two heartbeats

gs.heartbeat();
assert_eq!(gs.mesh[&topics[0]].len(), 5,
"should not apply opportunistic grafting after first tick");

gs.heartbeat();

assert_eq!(gs.mesh[&topics[0]].len(), 7,
"opportunistic grafting should have added 2 peers");

assert!(gs.mesh[&topics[0]].is_superset(&peers.iter().cloned().collect()),
"old peers are still part of the mesh");

assert!(gs.mesh[&topics[0]].is_disjoint(&others.iter().cloned().take(2).collect()),
"peers below or equal to median should not be added in opportunistic grafting");
}
}
45 changes: 45 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ pub struct GossipsubConfig {
/// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`.
/// The default is 2.
mesh_outbound_min: usize,

// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is
// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
// The default is 60.
opportunistic_graft_ticks: u64,

/// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
opportunistic_graft_peers: usize,
}

//TODO should we use a macro for getters + the builder?
Expand Down Expand Up @@ -351,6 +361,21 @@ impl GossipsubConfig {
pub fn mesh_outbound_min(&self) -> usize {
self.mesh_outbound_min
}

// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is
// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
// The default is 60.
pub fn opportunistic_graft_ticks(&self) -> u64 {
self.opportunistic_graft_ticks
}


/// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
pub fn opportunistic_graft_peers(&self) -> usize {
self.opportunistic_graft_peers
}
}

impl Default for GossipsubConfig {
Expand Down Expand Up @@ -418,6 +443,8 @@ impl GossipsubConfigBuilder {
flood_publish: true,
graft_flood_threshold: Duration::from_secs(10),
mesh_outbound_min: 2,
opportunistic_graft_ticks: 60,
opportunistic_graft_peers: 2
},
}
}
Expand Down Expand Up @@ -613,6 +640,22 @@ impl GossipsubConfigBuilder {
self
}

// Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is
// applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh
// peers to replace lower-scoring ones, if the median score of our mesh peers falls below a
// threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
// The default is 60.
pub fn opportunistic_graft_ticks(&mut self, opportunistic_graft_ticks: u64) -> &mut Self {
self.config.opportunistic_graft_ticks = opportunistic_graft_ticks;
self
}

/// The maximum number of new peers to graft to during opportunistic grafting. The default is 2.
pub fn opportunistic_graft_peers(&mut self, opportunistic_graft_peers: usize) -> &mut Self {
self.config.opportunistic_graft_peers = opportunistic_graft_peers;
self
}

/// Constructs a `GossipsubConfig` from the given configuration and validates the settings.
pub fn build(&self) -> Result<GossipsubConfig, &str> {
//check all constraints on config
Expand Down Expand Up @@ -665,6 +708,8 @@ impl std::fmt::Debug for GossipsubConfig {
let _ = builder.field("flood_publish", &self.flood_publish);
let _ = builder.field("graft_flood_threshold", &self.graft_flood_threshold);
let _ = builder.field("mesh_outbound_min", &self.mesh_outbound_min);
let _ = builder.field("opportunistic_graft_ticks", &self.opportunistic_graft_ticks);
let _ = builder.field("opportunistic_graft_peers", &self.opportunistic_graft_peers);
builder.finish()
}
}
Expand Down