From b6fbf058f030e481773136477944e9ce19fa57da Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 6 Aug 2020 15:16:12 +0200 Subject: [PATCH 1/2] implement opportunistic grafting --- protocols/gossipsub/src/behaviour.rs | 53 ++++++++++++- protocols/gossipsub/src/behaviour/tests.rs | 88 +++++++++++++++++++++- protocols/gossipsub/src/config.rs | 45 +++++++++++ 3 files changed, 181 insertions(+), 5 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 21bd6f7bcb8..5034dc74086 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -60,6 +60,7 @@ use crate::protocol::{ }; use crate::rpc_proto; use crate::topic::{Hasher, Topic, TopicHash}; +use std::cmp::Ordering::Equal; mod tests; @@ -1547,7 +1548,57 @@ impl Gossipsub { } } - //TODO opportunistic grafting + // should we try to improve the mesh with opportunistic grafting? + if self.heartbeat_ticks % self.config.opportunistic_graft_ticks() == 0 + && peers.len() > 1 && self.peer_score.is_some() { + if let Some((_, thresholds, _)) = &self.peer_score { + // Opportunistic grafting works as follows: we check the median score of peers + // in the mesh; if this score is below the opportunisticGraftThreshold, we + // select a few peers at random with score over the median. + // The intention is to (slowly) improve an underperforming mesh by introducing + // good scoring peers that may have been gossiping at us. This allows us to + // get out of sticky situations where we are stuck with poor peers and also + // recover from churn of good peers. + + + // now compute the median peer score in the mesh + let mut peers_by_score: Vec<_> = peers.iter().collect(); + peers_by_score.sort_by(|p1, p2| + score(p1).partial_cmp(&score(p2)).unwrap_or(Equal) + ); + + let middle = peers_by_score.len() / 2; + let median = if peers_by_score.len() % 2 == 0 { + (score(*peers_by_score.get(middle - 1) + .expect("middle < vector length and middle > 0 since peers.len() > 0")) + + score(*peers_by_score.get(middle).expect("middle < vector length"))) + * 0.5 + } else { + score(*peers_by_score.get(middle).expect("middle < vector length")) + }; + + // if the median score is below the threshold, select a better peer (if any) and + // GRAFT + if median < thresholds.opportunistic_graft_threshold { + let peer_list = + Self::get_random_peers(topic_peers, topic_hash, + self.config.opportunistic_graft_peers(),|peer| { + !peers.contains(peer) + && !explicit_peers.contains(peer) + && !backoffs.is_backoff_with_slack(topic_hash, peer) + && score(peer) > median + }); + for peer in &peer_list { + let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new); + current_topic.push(topic_hash.clone()); + } + // update the mesh + debug!("Opportunistically graft in topic {} with peers {:?}", + topic_hash, peer_list); + peers.extend(peer_list); + } + } + } } // remove expired fanout topics diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 0feb3e7ad3b..68ec8378c53 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -25,13 +25,14 @@ mod tests { use std::thread::sleep; use std::time::Duration; + use async_std::net::Ipv4Addr; + use rand::Rng; + use crate::{GossipsubConfigBuilder, IdentTopic as Topic, TopicScoreParams}; use super::super::*; - use async_std::net::Ipv4Addr; - use rand::Rng; - // helper functions for testing +// helper functions for testing fn build_and_inject_nodes( peer_no: usize, @@ -2639,7 +2640,7 @@ mod tests { ); } - //TODO test oppertunisticGraftThreshold + //TODO test opportunisticGraftThreshold #[test] fn test_keep_best_scoring_peers_on_oversubscription() { @@ -3711,4 +3712,83 @@ mod tests { 1.0 * 0.9 * 0.9 * -2.0 ); } + + #[test] + fn test_opportunistic_grafting() { + let config = GossipsubConfigBuilder::new() + .mesh_n_low(3) + .mesh_n(5) + .mesh_n_high(7) + .mesh_outbound_min(0) //deactivate outbound handling + .opportunistic_graft_ticks(2) + .opportunistic_graft_peers(2) + .build() + .unwrap(); + let mut peer_score_params = PeerScoreParams::default(); + peer_score_params.app_specific_weight = 1.0; + let mut thresholds = PeerScoreThresholds::default(); + thresholds.opportunistic_graft_threshold = 2.0; + + let (mut gs, peers, topics) = + build_and_inject_nodes_with_config_and_explicit_and_outbound_and_scoring( + 5, + vec!["test".into()], + false, + config, + 0, + 0, + Some((peer_score_params, thresholds)), + ); + + //fill mesh with 5 peers + for peer in &peers { + gs.handle_graft(peer, topics.clone()); + } + + //add additional 5 peers + let others: Vec<_> = (0..5) + .into_iter() + .map(|_| add_peer(&mut gs, &topics, false, false)) + .collect(); + + //currently mesh equals peers + assert_eq!(gs.mesh[&topics[0]], peers.iter().cloned().collect()); + + //give others high scores (but the first two have not high enough scores) + for i in 0..5 { + gs.set_application_score(&peers[i], 0.0 + i as f64); + } + + //set scores for peers in the mesh + for i in 0..5 { + gs.set_application_score(&others[i], 0.0 + i as f64); + } + + //this gives a median of exactly 2.0 => should not apply opportunistic grafting + gs.heartbeat(); + gs.heartbeat(); + + assert_eq!(gs.mesh[&topics[0]].len(), 5, "should not apply opportunistic grafting"); + + //reduce middle score to 1.0 giving a median of 1.0 + gs.set_application_score(&peers[2], 1.0); + + + //opportunistic grafting after two heartbeats + + gs.heartbeat(); + assert_eq!(gs.mesh[&topics[0]].len(), 5, + "should not apply opportunistic grafting after first tick"); + + gs.heartbeat(); + + assert_eq!(gs.mesh[&topics[0]].len(), 7, + "opportunistic grafting should have added 2 peers"); + + assert!(gs.mesh[&topics[0]].is_superset(&peers.iter().cloned().collect()), + "old peers are still part of the mesh"); + + assert!(gs.mesh[&topics[0]].is_disjoint(&others.iter().cloned().take(2).collect()), + "peers below or equal to median should not be added in opportunistic grafting"); + } } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index ea05d030522..1323f3513c7 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -173,6 +173,16 @@ pub struct GossipsubConfig { /// This value must be smaller or equal than `mesh_n / 2` and smaller than `mesh_n_low`. /// The default is 2. mesh_outbound_min: usize, + + // Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + // applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh + // peers to replace lower-scoring ones, if the median score of our mesh peers falls below a + // threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). + // The default is 60. + opportunistic_graft_ticks: u64, + + /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2. + opportunistic_graft_peers: usize, } //TODO should we use a macro for getters + the builder? @@ -351,6 +361,21 @@ impl GossipsubConfig { pub fn mesh_outbound_min(&self) -> usize { self.mesh_outbound_min } + + // Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + // applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh + // peers to replace lower-scoring ones, if the median score of our mesh peers falls below a + // threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). + // The default is 60. + pub fn opportunistic_graft_ticks(&self) -> u64 { + self.opportunistic_graft_ticks + } + + + /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2. + pub fn opportunistic_graft_peers(&self) -> usize { + self.opportunistic_graft_peers + } } impl Default for GossipsubConfig { @@ -418,6 +443,8 @@ impl GossipsubConfigBuilder { flood_publish: true, graft_flood_threshold: Duration::from_secs(10), mesh_outbound_min: 2, + opportunistic_graft_ticks: 60, + opportunistic_graft_peers: 2 }, } } @@ -613,6 +640,22 @@ impl GossipsubConfigBuilder { self } + // Number of heartbeat ticks that specifcy the interval in which opportunistic grafting is + // applied. Every `opportunistic_graft_ticks` we will attempt to select some high-scoring mesh + // peers to replace lower-scoring ones, if the median score of our mesh peers falls below a + // threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds). + // The default is 60. + pub fn opportunistic_graft_ticks(&mut self, opportunistic_graft_ticks: u64) -> &mut Self { + self.config.opportunistic_graft_ticks = opportunistic_graft_ticks; + self + } + + /// The maximum number of new peers to graft to during opportunistic grafting. The default is 2. + pub fn opportunistic_graft_peers(&mut self, opportunistic_graft_peers: usize) -> &mut Self { + self.config.opportunistic_graft_peers = opportunistic_graft_peers; + self + } + /// Constructs a `GossipsubConfig` from the given configuration and validates the settings. pub fn build(&self) -> Result { //check all constraints on config @@ -665,6 +708,8 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("flood_publish", &self.flood_publish); let _ = builder.field("graft_flood_threshold", &self.graft_flood_threshold); let _ = builder.field("mesh_outbound_min", &self.mesh_outbound_min); + let _ = builder.field("opportunistic_graft_ticks", &self.opportunistic_graft_ticks); + let _ = builder.field("opportunistic_graft_peers", &self.opportunistic_graft_peers); builder.finish() } } From 189ffc402fbe5eef892282022623e54fea946d7c Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 6 Aug 2020 15:32:00 +0200 Subject: [PATCH 2/2] remove done TODO --- protocols/gossipsub/src/behaviour/tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 68ec8378c53..b9d9e171d75 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -2640,8 +2640,6 @@ mod tests { ); } - //TODO test opportunisticGraftThreshold - #[test] fn test_keep_best_scoring_peers_on_oversubscription() { let config = GossipsubConfigBuilder::new()