From dbe0a8454914cb93a47a4f862ad3d29a306a3348 Mon Sep 17 00:00:00 2001 From: blacktemplar Date: Thu, 30 Jul 2020 17:35:28 +0200 Subject: [PATCH] implement adaptive gossip dissemination + complete config debug output --- protocols/gossipsub/src/behaviour.rs | 40 ++++++++--- protocols/gossipsub/src/behaviour/tests.rs | 78 ++++++++++++++++++++++ protocols/gossipsub/src/config.rs | 27 +++++++- 3 files changed, 133 insertions(+), 12 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 1e2753d952a..20999199349 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::cmp::max; use std::collections::hash_map::Entry; use std::iter::FromIterator; use std::time::Duration; @@ -1278,13 +1279,18 @@ impl Gossipsub { return; } + // dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy` + let n_map = |m| { + max( + self.config.gossip_lazy, + (self.config.gossip_factor * m as f64) as usize, + ) + }; // get gossip_lazy random peers - let to_msg_peers = Self::get_random_peers( - &self.topic_peers, - &topic_hash, - self.config.gossip_lazy, - |peer| !peers.contains(peer) && !self.explicit_peers.contains(peer), - ); + let to_msg_peers = + Self::get_random_peers_dynamic(&self.topic_peers, &topic_hash, n_map, |peer| { + !peers.contains(peer) && !self.explicit_peers.contains(peer) + }); debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len()); @@ -1496,12 +1502,14 @@ impl Gossipsub { } } - /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` - /// filtered by the function `f`. - fn get_random_peers( + /// Helper function to get a subset of random gossipsub peers for a `topic_hash` + /// filtered by the function `f`. The number of peers to get equals the output of `n_map` + /// that gets as input the number of filtered peers. + fn get_random_peers_dynamic( topic_peers: &HashMap>, topic_hash: &TopicHash, - n: usize, + // maps the number of total peers to the number of selected peers + n_map: impl Fn(usize) -> usize, mut f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { let mut gossip_peers = match topic_peers.get(topic_hash) { @@ -1511,6 +1519,7 @@ impl Gossipsub { }; // if we have less than needed, return them + let n = n_map(gossip_peers.len()); if gossip_peers.len() <= n { debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len()); return gossip_peers.into_iter().collect(); @@ -1525,6 +1534,17 @@ impl Gossipsub { gossip_peers.into_iter().take(n).collect() } + /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` + /// filtered by the function `f`. + fn get_random_peers( + topic_peers: &HashMap>, + topic_hash: &TopicHash, + n: usize, + f: impl FnMut(&PeerId) -> bool, + ) -> BTreeSet { + Self::get_random_peers_dynamic(topic_peers, topic_hash, |_| n, f) + } + // adds a control action to control_pool fn control_pool_add( control_pool: &mut HashMap>, diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index a397393762a..1bfda1b5579 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1634,4 +1634,82 @@ mod tests { "Message cache should contain published message" ); } + + #[test] + fn test_gossip_to_at_least_gossip_lazy_peers() { + let config = GossipsubConfig::default(); + + //add more peers than in mesh to test gossipping + //by default only mesh_n_low peers will get added to mesh + let (mut gs, _, topic_hashes) = build_and_inject_nodes( + config.mesh_n_low + config.gossip_lazy + 1, + vec!["topic".into()], + true, + ); + + //receive message + let message = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![], + sequence_number: Some(0), + topics: vec![topic_hashes[0].clone()], + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &PeerId::random()); + + //emit gossip + gs.emit_gossip(); + + //check that exactly config.gossip_lazy many gossip messages were sent. + let msg_id = (gs.config.message_id_fn)(&message); + assert_eq!( + count_control_msgs(&gs, |peer, action| match action { + GossipsubControlAction::IHave { + topic_hash, + message_ids, + } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + _ => false, + }), + config.gossip_lazy + ); + } + + #[test] + fn test_gossip_to_at_most_gossip_factor_peers() { + let config = GossipsubConfig::default(); + + //add a lot of peers + let m = config.mesh_n_low + config.gossip_lazy * (2.0 / config.gossip_factor) as usize; + let (mut gs, _, topic_hashes) = build_and_inject_nodes(m, vec!["topic".into()], true); + + //receive message + let message = GossipsubMessage { + source: Some(PeerId::random()), + data: vec![], + sequence_number: Some(0), + topics: vec![topic_hashes[0].clone()], + signature: None, + key: None, + validated: true, + }; + gs.handle_received_message(message.clone(), &PeerId::random()); + + //emit gossip + gs.emit_gossip(); + + //check that exactly config.gossip_lazy many gossip messages were sent. + let msg_id = (gs.config.message_id_fn)(&message); + assert_eq!( + count_control_msgs(&gs, |peer, action| match action { + GossipsubControlAction::IHave { + topic_hash, + message_ids, + } => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id), + _ => false, + }), + ((m - config.mesh_n_low) as f64 * config.gossip_factor) as usize + ); + } } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 630a728b88e..6fdaf683522 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -72,9 +72,15 @@ pub struct GossipsubConfig { /// is 12). pub mesh_n_high: usize, - /// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6). + /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec, + /// default is 6). pub gossip_lazy: usize, + /// Affects how many peers we will emit gossip to at each heartbeat. + /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or + /// `gossip_lazy`, whichever is greater. The default is 0.25. + pub gossip_factor: f64, + /// Initial delay in each heartbeat (default is 5 seconds). pub heartbeat_initial_delay: Duration, @@ -162,6 +168,7 @@ impl Default for GossipsubConfig { mesh_n_low: 5, mesh_n_high: 12, gossip_lazy: 6, // default to mesh_n + gossip_factor: 0.25, heartbeat_initial_delay: Duration::from_secs(5), heartbeat_interval: Duration::from_secs(1), fanout_ttl: Duration::from_secs(60), @@ -270,12 +277,21 @@ impl GossipsubConfigBuilder { self } - /// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6). + /// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec, + /// default is 6). pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self { self.config.gossip_lazy = gossip_lazy; self } + /// Affects how many peers we will emit gossip to at each heartbeat. + /// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or + /// `gossip_lazy`, whichever is greater. The default is 0.25. + pub fn gossip_factor(&mut self, gossip_factor: f64) -> &mut Self { + self.config.gossip_factor = gossip_factor; + self + } + /// Initial delay in each heartbeat (default is 5 seconds). pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self { self.config.heartbeat_initial_delay = heartbeat_initial_delay; @@ -408,12 +424,19 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("mesh_n_low", &self.mesh_n_low); let _ = builder.field("mesh_n_high", &self.mesh_n_high); let _ = builder.field("gossip_lazy", &self.gossip_lazy); + let _ = builder.field("gossip_factor", &self.gossip_factor); let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay); let _ = builder.field("heartbeat_interval", &self.heartbeat_interval); let _ = builder.field("fanout_ttl", &self.fanout_ttl); let _ = builder.field("max_transmit_size", &self.max_transmit_size); let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time); let _ = builder.field("validate_messages", &self.validate_messages); + let _ = builder.field("validation_mode", &self.validation_mode); + let _ = builder.field("do_px", &self.do_px); + let _ = builder.field("prune_peers", &self.prune_peers); + let _ = builder.field("prune_backoff", &self.prune_backoff); + let _ = builder.field("backoff_slack", &self.backoff_slack); + let _ = builder.field("flood_publish", &self.flood_publish); builder.finish() } }